温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

KAFKA是如何处理粘包拆包的

发布时间:2021-11-22 09:54:25 来源:亿速云 阅读:237 作者:iii 栏目:云计算

本篇内容主要讲解“KAFKA是如何处理粘包拆包的”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“KAFKA是如何处理粘包拆包的”吧!

一、为什么会出现粘包拆包现象?

我们知道,TCP数据包都是按照协议进行拆包、编号然后分批发送的;
那么对应我们应用层有意义的数据包,传输层的协议并不了解其含义,更不会去根据你的业务内容去分包和发送,只会按照自己的协议栈去进行数据发送。
因此,就出现了网络数据的粘包,拆包问题。
究其本质,其实就是传输层并不了解上层应用的数据含义,只会按照协议栈进行数据发送。


二、通常有哪些解决粘包拆包问题的方法?

在了解出现这个问题的本质后,那么要想解决这个问题就很简单了。
不就是在进行数据接收的时候,我们应用层收到数据后根据标识判断一下,数据是否完整,如果完整了我们再进行数据包解析,最后交给业务代码不就好了?
通常解决粘包拆包的问题有三种方案:

  1. 定长,例如我保证我每一条数据都是200b,那么我每接收到200b就认为是一条完整的数据,接着就可以进行解析,并向业务代码交付。

  2. 分隔符,一样的意思,我每条数据末尾都用一个分隔符例如换行符,制表符这种来标识这条数据写完了,那么我们收到数据判找一下这个分割符在哪儿,最后进行切割就可以得到完整的数据包了。

  3. 自定义协议,这个也很简单,就是定义一个你的完整数据包的内容格式是什么样子的,例如 len + data,其中len是代表data的字节长度。这样每次根据前面4个字节的len,就能得到后面还需要多少数据才是一条完整的数据,少了就等,多了就截取。

最后,可能很多不熟悉网络编程的同学会纳闷,那万一TCP的数据包丢失了,乱序了,上面这种方法不就出问题了嘛?
其实不是的,TCP一个可靠的消息传输协议,其协议的根本思想就是提供可靠的数据传输服务。
翻译一下就是,你可以相信TCP传输的数据是可靠的,在交付给应用层数据的时候,是不会出现上述这种情况的。
出现这种情况只会在传输层出现,而TCP协议也为对应的情况设计了分批、编号、去重、校验和、超时重传等一系列的操作,来保证数据可靠。


三、kakfa是如何解决粘包拆包问题的呢?

最后,让我们来看下kafka是如何解决粘包拆包问题的呢?是以上面提到的哪种方式来解决的呢?
首先看粘包,也就是接收到了多余的数据,该如何拆分数据包,读取到正确完整的数据包?
如下面代码所示,分为三个阶段:

  1. 先读取前4字节,转换为一个int,即长度。

  2. 根据长度申请内存buffer。

  3. 最后读取指定大小的数据到申请好的buffer

由此,就完整了一整条数据的正确读取。整个过程其实就是上面提到的 len+data 这么一个简单的自定义协议

public NetworkReceive read() throws IOException {
    NetworkReceive result = null;    // 新建一个receive    if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool);    }    // 真正的数据read    receive(receive);    // 数据读取完成的后置操作    if (receive.complete()) {        // 倒带,等待读receive.payload().rewind();        // 直接引用赋值        result = receive;        // 最后清空当前引用,然后等待下次进入read的时候,执行new 操作        receive = null;    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves.        mute();    }return result;}
public long readFrom(ScatteringByteChannel channel) throws IOException {int read = 0;
    // 存在数据if (size.hasRemaining()) {        // len + dataint bytesRead = channel.read(size);        if (bytesRead < 0)throw new EOFException();        read += bytesRead;
        // 如果读满了长度,则直接倒带得到具体的len值
        // 这里的size是一个byteBuffer类型的,也就是接收到的数据        if (!size.hasRemaining()) {size.rewind();            int receiveSize = size.getInt();            if (receiveSize < 0)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");            if (maxSize != UNLIMITED && receiveSize > maxSize)throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");            requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)            if (receiveSize == 0) {buffer = EMPTY_BUFFER;            }
        }
    }    // 如果长度已经就绪了,那么就需要接下来的data需要多少空间,在这里进行申请if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet        buffer = memoryPool.tryAllocate(requestedBufferSize);        if (buffer == null)log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);    }
    // 申请完毕之后,就调用read函数,直接read出来即可。if (buffer != null) {        int bytesRead = channel.read(buffer);        if (bytesRead < 0)throw new EOFException();        read += bytesRead;    }    // 返回读取的总字节数return read;}

再先看拆包,也就是接收到数据不够组成一条完整的数据,该如何等待完整的数据包?
下面代码最核心的就是receive.complete()函数的判断逻辑,这个判断的三个条件分别意味着:

  • !size.hasRemaining():接收到的buffer数据已经读取完成。

  • buffer != null:buffer已经创建。

  • !buffer.hasRemaining():buffer已经读取完成。

翻译一下,其实就是只要一条数据没读完整,那么receive.complete()函数返回值就是false,那么最终返回的结果就是null,等待下一次OP_READ事件的时候再接着上次没读完的数据读取,直到读取一条完整的数据为止。

public NetworkReceive read() throws IOException {
    NetworkReceive result = null;    if (receive == null) {receive = new NetworkReceive(maxReceiveSize, id, memoryPool);    }

    receive(receive);    if (receive.complete()) {receive.payload().rewind();        result = receive;        receive = null;    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {//pool must be out of memory, mute ourselves.        mute();    }return result;}
public boolean complete() {    return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();}

最后,我们再补充一点,当我们一次性收到很多条数据的时候,会如何处理呢?
下面的源码告诉了我们答案,就是一次性全部读取出来,然后存入stageReceives这个数据结构中等待下一步业务处理。

private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {//if channel is ready and has bytes to read from socket or buffer, and has no    //previous receive(s) already staged or otherwise in progress then read from it    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
        NetworkReceive networkReceive;        // 一次性读取所有的receives,暂存到stageReceives中        while ((networkReceive = channel.read()) != null) {            madeReadProgressLastPoll = true;            addToStagedReceives(channel, networkReceive);        }// isMute是判断当前channel是否关注了OP_READ事件        if (channel.isMute()) {outOfMemory = true; //channel has muted itself due to memory pressure.        } else {madeReadProgressLastPoll = true;        }
    }
}

到此,相信大家对“KAFKA是如何处理粘包拆包的”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI