博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka Network层解析
阅读量:7117 次
发布时间:2019-06-28

本文共 13991 字,大约阅读时间需要 46 分钟。

我们知道kafka是基于TCP连接的。其并没有像很多中间件使用netty作为TCP服务器。而是自己基于Java NIO写了一套。关于kafka为什么没有选用netty的原因可以看。

对Java NIO不太了解的同学可以先看下这两篇文章,本文需要读者对NIO有一定的了解。

更多文章见个人博客:

几个重要类

先看下Kafka Client的网络层架构,图片来自于这篇。

image

本文主要分析的是Network层。

Network层有两个重要的类:SelectorKafkaChannel

这两个类和Java NIO层的java.nio.channels.SelectorChannel有点类似。

Selector几个关键字段如下

// jdk nio中的Selectorjava.nio.channels.Selector nioSelector;// 记录当前Selector的所有连接信息Map
channels;// 已发送完成的请求List
completedSends;// 已收到的请求List
completedReceives;// 还没有完全收到的请求,对上层不可见Map
> stagedReceives;// 作为client端,调用connect连接远端时返回true的连接Set
immediatelyConnectedKeys;// 已经完成的连接List
connected;// 一次读取的最大大小int maxReceiveSize;

从网络层来看kafka是分为client端(producer和consumer,broker作为从时也是client)和server端(broker)的。本文将分析client端是如何建立连接,以及收发数据的。server也是依靠SelectorKafkaChannel进行网络传输。在Network层两端的区别并不大。

建立连接

kafka的client端启动时会调用Selector#connect(下文中如无特殊注明,均指org.apache.kafka.common.network.Selector)方法建立连接。

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {    if (this.channels.containsKey(id))        throw new IllegalStateException("There is already a connection for id " + id);    // 创建一个SocketChannel    SocketChannel socketChannel = SocketChannel.open();    // 设置为非阻塞模式    socketChannel.configureBlocking(false);    // 创建socket并设置相关属性    Socket socket = socketChannel.socket();    socket.setKeepAlive(true);    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)        socket.setSendBufferSize(sendBufferSize);    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)        socket.setReceiveBufferSize(receiveBufferSize);    socket.setTcpNoDelay(true);    boolean connected;    try {        // 调用SocketChannel的connect方法,该方法会向远端发起tcp建连请求        // 因为是非阻塞的,所以该方法返回时,连接不一定已经建立好(即完成3次握手)。连接如果已经建立好则返回true,否则返回false。一般来说server和client在一台机器上,该方法可能返回true。        connected = socketChannel.connect(address);    } catch (UnresolvedAddressException e) {        socketChannel.close();        throw new IOException("Can't resolve address: " + address, e);    } catch (IOException e) {        socketChannel.close();        throw e;    }    // 对CONNECT事件进行注册    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);    KafkaChannel channel;    try {        // 构造一个KafkaChannel        channel = channelBuilder.buildChannel(id, key, maxReceiveSize);    } catch (Exception e) {      ...    }    // 将kafkachannel绑定到SelectionKey上    key.attach(channel);    // 放入到map中,id是远端服务器的名称    this.channels.put(id, channel);    // connectct为true代表该连接不会再触发CONNECT事件,所以这里要单独处理    if (connected) {        // OP_CONNECT won't trigger for immediately connected channels        log.debug("Immediately connected to node {}", channel.id());        // 加入到一个单独的集合中        immediatelyConnectedKeys.add(key);        // 取消对该连接的CONNECT事件的监听        key.interestOps(0);    }}

这里的流程和标准的NIO流程差不多,需要单独说下的是socketChannel#connect方法返回true的场景,该方法的注释中有提到

* 

If this channel is in non-blocking mode then an invocation of this* method initiates a non-blocking connection operation. If the connection* is established immediately, as can happen with a local connection, then* this method returns true. Otherwise this method returns* false and the connection operation must later be completed by* invoking the {@link #finishConnect finishConnect} method.

也就是说在非阻塞模式下,对于local connection,连接可能在马上就建立好了,那该方法会返回true,对于这种情况,不会再触发之后的connect事件。因此kafka用一个单独的集合immediatelyConnectedKeys将这些特殊的连接记录下来。在接下来的步骤会进行特殊处理。

之后会调用poll方法对网络事件监听:

public void poll(long timeout) throws IOException {...// select方法是对java.nio.channels.Selector#select的一个简单封装int readyKeys = select(timeout);...// 如果有就绪的事件或者immediatelyConnectedKeys非空if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {    // 对已就绪的事件进行处理,第2个参数为false    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);    // 对immediatelyConnectedKeys进行处理。第2个参数为true    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);}addToCompletedReceives();...}private void pollSelectionKeys(Iterable
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {Iterator
iterator = selectionKeys.iterator();// 遍历集合while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的KafkaChannel KafkaChannel channel = channel(key); ... try { // 如果当前处理的是immediatelyConnectedKeys集合的元素或处理的是CONNECT事件 if (isImmediatelyConnected || key.isConnectable()) { // finishconnect中会增加READ事件的监听 if (channel.finishConnect()) { this.connected.add(channel.id()); this.sensors.connectionCreated.record(); ... } else continue; } // 对于ssl的连接还有些额外的步骤 if (channel.isConnected() && !channel.ready()) channel.prepare(); // 如果是READ事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } // 如果是WRITE事件 if (channel.ready() && key.isWritable()) { Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } // 如果连接失效 if (!key.isValid()) close(channel, true); } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); }}}

因为immediatelyConnectedKeys中的连接不会触发CONNNECT事件,所以在poll时会单独对immediatelyConnectedKeys的channel调用finishConnect方法。在明文传输模式下该方法会调用到PlaintextTransportLayer#finishConnect,其实现如下:

public boolean finishConnect() throws IOException {    // 返回true代表已经连接好了    boolean connected = socketChannel.finishConnect();    if (connected)        // 取消监听CONNECt事件,增加READ事件的监听        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);    return connected;}

关于immediatelyConnectedKeys更详细的内容可以看看。

发送数据

kafka发送数据分为两个步骤:

1.调用Selector#send将要发送的数据保存在对应的KafkaChannel中,该方法并没有进行真正的网络IO

// Selector#sendpublic void send(Send send) {    String connectionId = send.destination();    // 如果所在的连接正在关闭中,则加入到失败集合failedSends中    if (closingChannels.containsKey(connectionId))        this.failedSends.add(connectionId);    else {        KafkaChannel channel = channelOrFail(connectionId, false);        try {            channel.setSend(send);        } catch (CancelledKeyException e) {            this.failedSends.add(connectionId);            close(channel, false);        }    }}//KafkaChannel#setSendpublic void setSend(Send send) {    // 如果还有数据没有发送出去则报错    if (this.send != null)        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");    // 保存下来    this.send = send;    // 添加对WRITE事件的监听    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}
  1. 调用Selector#poll,在第一步中已经对该channel注册了WRITE事件的监听,所以在当channel可写时,会调用到pollSelectionKeys将数据真正的发送出去。
private void pollSelectionKeys(Iterable
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {Iterator
iterator = selectionKeys.iterator();// 遍历集合while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的KafkaChannel KafkaChannel channel = channel(key); ... try { ... // 如果是WRITE事件 if (channel.ready() && key.isWritable()) { // 真正的网络写 Send send = channel.write(); // 一个Send对象可能会被拆成几次发送,write非空代表一个send发送完成 if (send != null) { // completedSends代表已发送完成的集合 this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } ... } catch (Exception e) { ... } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); }}}

当可写时,会调用KafkaChannel#write方法,该方法中会进行真正的网络IO:

public Send write() throws IOException {    Send result = null;    if (send != null && send(send)) {        result = send;        send = null;    }    return result;}private boolean send(Send send) throws IOException {    // 最终调用SocketChannel#write进行真正的写    send.writeTo(transportLayer);    if (send.completed())        // 如果写完了,则移除对WRITE事件的监听        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);    return send.completed();}

接收数据

如果远端有发送数据过来,那调用poll方法时,会对接收到的数据进行处理。

public void poll(long timeout) throws IOException {...// select方法是对java.nio.channels.Selector#select的一个简单封装int readyKeys = select(timeout);...// 如果有就绪的事件或者immediatelyConnectedKeys非空if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {    // 对已就绪的事件进行处理,第2个参数为false    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);    // 对immediatelyConnectedKeys进行处理。第2个参数为true    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);}addToCompletedReceives();...}private void pollSelectionKeys(Iterable
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {Iterator
iterator = selectionKeys.iterator();// 遍历集合while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 移除当前元素,要不然下次poll又会处理一遍 iterator.remove(); // 得到connect时创建的KafkaChannel KafkaChannel channel = channel(key); ... try { ... // 如果是READ事件 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; // read方法会从网络中读取数据,但可能一次只能读取一个req的部分数据。只有读到一个完整的req的情况下,该方法才返回非null while ((networkReceive = channel.read()) != null) // 将读到的请求存在stagedReceives中 addToStagedReceives(channel, networkReceive); } ... } catch (Exception e) { ... } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); }}}private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) { if (!stagedReceives.containsKey(channel)) stagedReceives.put(channel, new ArrayDeque
()); Deque
deque = stagedReceives.get(channel); deque.add(receive);}

在之后的addToCompletedReceives方法中会对该集合进行处理。

private void addToCompletedReceives() {    if (!this.stagedReceives.isEmpty()) {        Iterator
>> iter = this.stagedReceives.entrySet().iterator(); while (iter.hasNext()) { Map.Entry
> entry = iter.next(); KafkaChannel channel = entry.getKey(); // 对于client端来说该isMute返回为false,server端则依靠该方法保证消息的顺序 if (!channel.isMute()) { Deque
deque = entry.getValue(); addToCompletedReceives(channel, deque); if (deque.isEmpty()) iter.remove(); } } }}private void addToCompletedReceives(KafkaChannel channel, Deque
stagedDeque) { // 将每个channel的第一个NetworkReceive加入到completedReceives NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());}

读出数据后,会先放到stagedReceives集合中,然后在addToCompletedReceives方法中对于每个channel都会从stagedReceives取出一个NetworkReceive(如果有的话),放入到completedReceives中。

这样做的原因有两点:

  1. 对于SSL的连接来说,其数据内容是加密的,所以不能精准的确定本次需要读取的数据大小,只能尽可能的多读,这样会导致可能会比请求的数据读的要多。那如果该channel之后没有数据可以读,会导致多读的数据将不会被处理。
  2. kafka需要确保一个channel上request被处理的顺序是其发送的顺序。因此对于每个channel而言,每次poll上层最多只能看见一个请求,当该请求处理完成之后,再处理其他的请求。在sever端,每次poll后都会将该channel给mute掉,即不再从该channel上读取数据。当处理完成之后,才将该channelunmute,即之后可以从该socket上读取数据。而client端则是通过InFlightRequests#canSendMore控制。

代码中关于这段逻辑的注释如下:

/* In the "Plaintext" setting, we are using socketChannel to read & write to the network. But for the "SSL" setting,* we encrypt the data before we use socketChannel to write data to the network, and decrypt before we return the responses.* This requires additional buffers to be maintained as we are reading from network, since the data on the wire is encrypted* we won't be able to read exact no.of bytes as kafka protocol requires. We read as many bytes as we can, up to SSLEngine's* application buffer size. This means we might be reading additional bytes than the requested size.* If there is no further data to read from socketChannel selector won't invoke that channel and we've have additional bytes* in the buffer. To overcome this issue we added "stagedReceives" map which contains per-channel deque. When we are* reading a channel we read as many responses as we can and store them into "stagedReceives" and pop one response during* the poll to add the completedReceives. If there are any active channels in the "stagedReceives" we set "timeout" to 0* and pop response and add to the completedReceives.* Atmost one entry is added to "completedReceives" for a channel in each poll. This is necessary to guarantee that     * requests from a channel are processed on the broker in the order they are sent. Since outstanding requests added     * by SocketServer to the request queue may be processed by different request handler threads, requests on each     * channel must be processed one-at-a-time to guarantee ordering.*/

End

本文分析了kafka network层的实现,在阅读kafka源码时,如果不把network层搞清楚会比较迷,比如req/resp的顺序保障机制、真正进行网络IO的不是send方法等等。

转载地址:http://qkbel.baihongyu.com/

你可能感兴趣的文章
【算法专栏】-- 谈谈时间复杂度
查看>>
元数据治理框架Atlas研究——JanusGraph图数据库对象关系映射
查看>>
力扣(LeetCode)155
查看>>
sas 做 titanic 未完待续
查看>>
区块链是一个公共数据库,要放在一个块内
查看>>
jqGrid的rowNum属性默认值、-1情况的介绍
查看>>
一步步学会用docker部署应用(nodejs版)
查看>>
分享10个免费H5模版(主题)资源网站
查看>>
RabbitMQ预研
查看>>
机器学习之分类结果的评价
查看>>
理解五个基本概念,让你更像机器学习专家
查看>>
你应该知道的数据库数据类型及其设计原则
查看>>
解决vue报错Failed to mount component
查看>>
[LeetCode] 124. Binary Tree Maximum Path Sum
查看>>
活学活用! 用Local Storage实现多人聊天室
查看>>
一次爬虫实践记录
查看>>
炫酷粒子表白,双十一脱单靠它了!
查看>>
mysql锁以及实践总结
查看>>
react 移动端 兼容性问题和一些小细节
查看>>
HTTP 详解(一)
查看>>