本篇内容主要讲解“KAFKA是如何处理粘包拆包的”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“KAFKA是如何处理粘包拆包的”吧!
我们知道,TCP数据包都是按照协议进行拆包、编号然后分批发送的;
那么对应我们应用层有意义的数据包,传输层的协议并不了解其含义,更不会去根据你的业务内容去分包和发送,只会按照自己的协议栈去进行数据发送。
因此,就出现了网络数据的粘包,拆包问题。
究其本质,其实就是传输层并不了解上层应用的数据含义,只会按照协议栈进行数据发送。
在了解出现这个问题的本质后,那么要想解决这个问题就很简单了。
不就是在进行数据接收的时候,我们应用层收到数据后根据标识判断一下,数据是否完整,如果完整了我们再进行数据包解析,最后交给业务代码不就好了?
通常解决粘包拆包的问题有三种方案:
定长,例如我保证我每一条数据都是200b,那么我每接收到200b就认为是一条完整的数据,接着就可以进行解析,并向业务代码交付。
分隔符,一样的意思,我每条数据末尾都用一个分隔符例如换行符,制表符这种来标识这条数据写完了,那么我们收到数据判找一下这个分割符在哪儿,最后进行切割就可以得到完整的数据包了。
自定义协议,这个也很简单,就是定义一个你的完整数据包的内容格式是什么样子的,例如 len + data,其中len是代表data的字节长度。这样每次根据前面4个字节的len,就能得到后面还需要多少数据才是一条完整的数据,少了就等,多了就截取。
最后,可能很多不熟悉网络编程的同学会纳闷,那万一TCP的数据包丢失了,乱序了,上面这种方法不就出问题了嘛?
其实不是的,TCP一个可靠的消息传输协议,其协议的根本思想就是提供可靠的数据传输服务。
翻译一下就是,你可以相信TCP传输的数据是可靠的,在交付给应用层数据的时候,是不会出现上述这种情况的。
出现这种情况只会在传输层出现,而TCP协议也为对应的情况设计了分批、编号、去重、校验和、超时重传等一系列的操作,来保证数据可靠。
最后,让我们来看下kafka是如何解决粘包拆包问题的呢?是以上面提到的哪种方式来解决的呢?
首先看粘包,也就是接收到了多余的数据,该如何拆分数据包,读取到正确完整的数据包?
如下面代码所示,分为三个阶段:
先读取前4字节,转换为一个int,即长度。
根据长度申请内存buffer。
最后读取指定大小的数据到申请好的buffer
由此,就完整了一整条数据的正确读取。整个过程其实就是上面提到的 len+data 这么一个简单的自定义协议。
public NetworkReceive read() throws IOException { NetworkReceive result = null; // 新建一个receive if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } // 真正的数据read receive(receive); // 数据读取完成的后置操作 if (receive.complete()) { // 倒带,等待读receive.payload().rewind(); // 直接引用赋值 result = receive; // 最后清空当前引用,然后等待下次进入read的时候,执行new 操作 receive = null; } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public long readFrom(ScatteringByteChannel channel) throws IOException {int read = 0; // 存在数据if (size.hasRemaining()) { // len + dataint bytesRead = channel.read(size); if (bytesRead < 0)throw new EOFException(); read += bytesRead; // 如果读满了长度,则直接倒带得到具体的len值 // 这里的size是一个byteBuffer类型的,也就是接收到的数据 if (!size.hasRemaining()) {size.rewind(); int receiveSize = size.getInt(); if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL) if (receiveSize == 0) {buffer = EMPTY_BUFFER; } } } // 如果长度已经就绪了,那么就需要接下来的data需要多少空间,在这里进行申请if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet buffer = memoryPool.tryAllocate(requestedBufferSize); if (buffer == null)log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 申请完毕之后,就调用read函数,直接read出来即可。if (buffer != null) { int bytesRead = channel.read(buffer); if (bytesRead < 0)throw new EOFException(); read += bytesRead; } // 返回读取的总字节数return read;}
再先看拆包,也就是接收到数据不够组成一条完整的数据,该如何等待完整的数据包?
下面代码最核心的就是receive.complete()函数的判断逻辑,这个判断的三个条件分别意味着:
!size.hasRemaining():接收到的buffer数据已经读取完成。
buffer != null:buffer已经创建。
!buffer.hasRemaining():buffer已经读取完成。
翻译一下,其实就是只要一条数据没读完整,那么receive.complete()函数返回值就是false,那么最终返回的结果就是null,等待下一次OP_READ事件的时候再接着上次没读完的数据读取,直到读取一条完整的数据为止。
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool); } receive(receive); if (receive.complete()) {receive.payload().rewind(); result = receive; receive = null; } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves. mute(); }return result;}
public boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}
最后,我们再补充一点,当我们一次性收到很多条数据的时候,会如何处理呢?
下面的源码告诉了我们答案,就是一次性全部读取出来,然后存入stageReceives这个数据结构中等待下一步业务处理。
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no //previous receive(s) already staged or otherwise in progress then read from it if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; // 一次性读取所有的receives,暂存到stageReceives中 while ((networkReceive = channel.read()) != null) { madeReadProgressLastPoll = true; addToStagedReceives(channel, networkReceive); }// isMute是判断当前channel是否关注了OP_READ事件 if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure. } else {madeReadProgressLastPoll = true; } } }
到此,相信大家对“KAFKA是如何处理粘包拆包的”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。