Java NIO - Netty 的零拷贝

Netty 的零拷贝

零拷贝的必要性回顾

前面我们说过,零拷贝的核心目的是:消除冗余的数据搬运开销,榨干 CPU 和 I/O 的极限性能。在传统 I/O 中,数据在内核缓冲区与用户缓冲区之间频繁 “套娃式” 拷贝,会导致:CPU不断在内核态与用户态间上下文转换,并且CPU也被被大量占用去执行无意义的搬运指令。零拷贝的价值就在于:

  • :通过 mmap 或 sendfile 建立直接映射,数据直接在内核空间或硬件间传输。
  • :减少内存占用,降低总线带宽压力。
  • :降低网络延迟,是大规模高并发系统的性能分水岭。


Netty 中的零拷贝

Netty中的零拷贝和操作系统层面上的零拷贝是有区别的,不能混淆,我们所说的 Netty 零拷贝完全是基于Java层面或者说用户空间的,它更多的是偏向于应用中的数据操作优化,而不是系统层面的操作优化。

大部分场景下,在Netty接收和发送ByteBuffer的过程中会使用直接内存进行Socket通道读写,使用JVM的堆内存进行业务处理,会涉及直接内存、堆内存之间的数据复制。内存的数据复制其实是效率非常低的,Netty提供了多种方法,以帮助应用程序减少内存的复制。Netty 的零拷贝主要体现在五个方面:

  • Netty提供 CompositeByteBuf 组合缓冲区类,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个ByteBuf 之间的拷贝。
  • Netty提供了ByteBuf 的浅层复制操作(sliceduplicate),可以将 ByteBuf 分解为多个共享同一个存储区域的 ByteBuf,避免内存的拷贝。
  • 在使用 Netty 进行文件传输时,可以调用 FileRegion 包装的 transferTo() 方法直接将文件缓冲区的数据发送到目标通道,避免普通的循环读取文件数据和写入通道所导致的内存拷贝问题。
  • 在将一个 byte 数组转换为一个 ByteBuf 对象的场景下,Netty 提供了一系列的包装类,避免了转换过程中的内存拷贝。
  • 如果通道接收和发送 ByteBuf 都使用直接内存进行 Socket 读写,就不需要进行缓冲区的二次拷贝。如果使用JVM 的堆内存进行 Socket 读写,那么 JVM 会先将堆内存 Buffer 拷贝一份到直接内存再写入 Socket 中,相比于使用直接内存,这种情况在发送过程中会多出一次缓冲区的内存拷贝。所以,在发送ByteBuffer 到 Socket时,尽量使用直接内存而不是 JVM 堆内存。


CompositeByteBuf 实现零拷贝

CompositeByteBuf 简介

CompositeByteBuf 可以把需要合并的多个 ByteBuf 组合起来,对外提供统一的 readIndex 和 writerIndex。CompositeByteBuf 只是在逻辑上是一个整体,在 CompositeByteBuf 内部,合并的多个ByteBuf 都是单独存在的。CompositeByteBuf 里面有一个Component 数组,聚合的 ByteBuf 都放在 Component 数组里面,最小容量为16。

在很多通信编程场景下,需要多个 ByteBuf 组成一个完整的消息。 例如,HTTP协议传输时消息总是由 Header(消息头)和 Body(消息体)组成。如果传输的内容很长,就会分成多个消息包进行发送,消息中的 Header 就需要重用,而不是每次发送都创建新的 Header 缓冲 区。这时可以使用 CompositeByteBuf 缓冲区进行 ByteBuf组合,避免内存拷贝。假设有一份协议数据,它由头部和消息体组成,而头部和消息体 是分别存放在两个ByteBuf中的,为了方便后续处理,要将两个 ByteBuf 进行合并:

合并多个 ByteBuf 示例

使用CompositeByteBuf 合并多个ByteBuf,大致的代码如下:

1
2
3
4
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(headerBuf, bodyBuf);

不使用 CompositeByteBuf,将 header 和 body 合并为一个 ByteBuf 的代码大致如下:

1
2
3
4
5
6
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
long length = headerBuf.readableBytes() + bodyBuf.readableBytes();
ByteBuf allBuf = Unpooled.buffer(length);
allBuf.writeBytes(headerBuf); // 拷贝header数据
allBuf.writeBytes(body); // 拷贝body数据

上述过程将header和body都拷贝到了新的 allBuf 中,这增加了两 次额外的数据拷贝操作。所以,使用CompositeByteBuf 合并 ByteBuf 可以减少两次额外的数据拷贝操作。下面是一段通过CompositeByteBuf来复用header的比较完整的演示代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CompositeBufferTest {
@Test
public void byteBufComposite() {
// 消息头:copiedBuffer 会创建一个新的堆内存 ByteBuf
ByteBuf headerBuf = Unpooled.copiedBuffer("Owlias 1.1:", StandardCharsets.UTF_8);

// 消息体1
CompositeByteBuf cbuf = ByteBufAllocator.DEFAULT.compositeBuffer(); // 创建一个空的 CompositeByteBuf,可组合多个 ByteBuf
ByteBuf bodyBuf = Unpooled.copiedBuffer("name=zhangsan", StandardCharsets.UTF_8);
cbuf.addComponents(headerBuf, bodyBuf); // 将 headerBuf 和 bodyBuf 添加到组合缓冲区
sendMsg(cbuf);
// 重要:在 cbuf 释放前,对 headerBuf 调用 retain() 增加引用计数,因为 cbuf.release 会释放其包含的所有子缓冲区!
headerBuf.retain();
// 释放第一个组合缓冲区及其包含的所有 ByteBuf,由于上面调用了 headerBuf.retain(),headerBuf 的引用计数不会归零!
cbuf.release();

// 消息体2
cbuf = ByteBufAllocator.DEFAULT.compositeBuffer();
bodyBuf = Unpooled.copiedBuffer("file=xxxxxxx", StandardCharsets.UTF_8);
cbuf.addComponents(headerBuf, bodyBuf); // 重用 headerBuf,再次组合 headerBuf 和新的 bodyBuf
sendMsg(cbuf);
cbuf.release(); // 释放第二个组合缓冲区,headerBuf 和新的 bodyBuf 都会被释放掉
}

private void sendMsg(CompositeByteBuf cbuf) {
// 遍历 CompositeByteBuf 中的所有 ByteBuf 组件
for (ByteBuf b :cbuf) {
// 获取当前 ByteBuf 可读字节数
int length = b.readableBytes();
// 创建字节数组用于存储数据
byte[] array = new byte[length];
// 从当前 ByteBuf 的 readerIndex 位置开始,复制 length 个字节到 array,getBytes 是非消耗性读取,不会移动 readerIndex
b.getBytes(b.readerIndex(), array);
// 处理一下数组中的数据
System.out.print(new String(array, StandardCharsets.UTF_8));
}
System.out.println();
}
}
1
2
Owlias 1.1:name=zhangsan
Owlias 1.1:file=xxxxxxx

在上面的程序中,调用 CompositeByteBuf 的 addComponents()方法 向自身增加了ByteBuf对象实例。对于所添加的 ByteBuf,Heap ByteBuf、Direct ByteBuf 均可。

如果CompositeByteBuf内部只存在一个ByteBuf,则调用其 hasArray() 方法,返回的是这个唯一 ByteBuf 的 hasArray() 方法的值;如果有多个ByteBuf,则其 hasArray() 方法会返回 false。另外,调用 CompositeByteBuf的 nioBuffer() 方法可以将 CompositeByteBuf 实例合并成一个新的 NIO ByteBuffer 缓冲区(注 意:不是Netty的ByteBuf缓冲区)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void intCompositeTest() {
// CompositeByteBuf 内部最多只能逻辑组合 3 个 独立的 ByteBuf 实例。
// 当你尝试添加第 4 个组件时,Netty 可能会根据内部逻辑将现有的部分组件合并(拷贝)成一个更大的缓冲区,以确保组件总数不超过这个设定的上限。
CompositeByteBuf cbuf = Unpooled.compositeBuffer(3);
cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}));
cbuf.addComponent(Unpooled.wrappedBuffer(new byte[] {4}));
cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{5, 6}));

// 合并成一个的 Java NIO 缓冲区
ByteBuffer nioBuffer = cbuf.nioBuffer(0, 6);
byte[] bytes = nioBuffer.array();
System.out.print("bytes = ");
for (byte b : bytes) {
System.out.print(b);
}
cbuf.release();
}


通过wrap操作实现零拷贝

Unpooled 提供了一系列的wrap包装方法,可以帮助大家方便、快速地包装出 CompositeByteBuf 实例或者ByteBuf 实例,而不用进行内存拷贝。Unpooled包装CompositeByteBuf的操作使用起来更加方便。例如,上面的header 与 body 的组合可以调用 Unpooled.wrappedBuffer() 方法。大致的代码如下:

1
2
3
ByteBuf headerBuf = ...;
ByteBuf bodyBuf = ...;
ByteBuf allByteBuf = Unpooled.wrappedBuffer(headerBuf , bodyBuf);

Unpooled类提供了很多重载的wrappedBuffer()方法,将多个 ByteBuf包装为CompositeByteBuf实例,从而实现零拷贝。

1
2
3
4
public static ByteBuf wrappedBuffer(ByteBuffer buffer);
public static ByteBuf wrappedBuffer(ByteBuf buffer);
public static ByteBuf wrappedBuffer(ByteBuf... buffers);
public static ByteBuf wrappedBuffer(ByteBuffer... buffers);

除了通过 Unpooled 包装 CompositeByteBuf 之外,还可以将 byte 数组包装成 ByteBuf。如果将一个 byte 数组转换为一个 ByteBuf 对象,大致的代码如下:

1
2
3
4
// 通过调用Unpooled.wrappedBuffer()方法将bytes包装为一个UnpooledHeapByteBuf对象,在包装的过程中不会有拷贝操作。
// 所得到的ByteBuf对象和bytes数组共用同一个存储空间,对bytes的修改也是对ByteBuf对象的修改。
byte[] bytes = ...;
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes)

如果不是调用Unpooled.wrappedBuffer()包装方法,那么传统的做法是将此byte数组的内容拷贝到ByteBuf中,大致的代码如下:

1
2
3
byte[] bytes = ...;
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(bytes);

显然,传统的转换方式是有额外的内存申请和拷贝操作的,既浪费了内存空间,又需要耗费内存复制的时间。相对而言,Unpooled 提供的wrap操作既复用了空间,又节省了时间。Unpooled提供了多个包装字节数组的重载方法,其中一些如下:

1
2
3
public static ByteBuf wrappedBuffer(byte[] array)
public static ByteBuf wrappedBuffer(byte[] array, int offset, int length)
public static ByteBuf wrappedBuffer(byte[]... arrays)