本篇内容主要讲解“AIO与NIO的实际区别是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“AIO与NIO的实际区别是什么”吧!
1. 从某种程度上来说,NIO依然是同步阻塞的
虽然NIO中Channel(网络Channel)和Buffer可以实现非阻塞的read/write操作,而且Selector提供了多路复用的功能,使得可以在一个线程中管理使用多个IO通道,避免了传统IO的存在的问题。但是,NIO中在Selector进行调用select()方法进行通道选择时,其依旧是同步阻塞的,而且由于多个Channel注册于Selector上,这个方法会同时阻塞多个IO请求操作,尽管select()方法可以设置超时返回,但依旧是不利的。
换句话说,虽然NIO在网络操作中,提供了非阻塞的read/write方法,但是NIO的IO行为还是同步的。对于NIO来说,我们的业务线程是在IO操作准备好时(调用Selector中的select()方法),得到通知(select()方法返回,表示有准备好的Channel),接着就由这个线程自行进行IO操作(通过Channel进行read/write操作),在第一代NIO中,每个线程可以持有多个IO通道并选择使用,但实际上一个线程还是只能选择操作一个IO,IO操作本身是同步的。
2. NIO改进
为了真正实现异步非阻塞的IO操作,在NIO的基础上进行改进,升级为2代NIO——即AIO机制。
AIO相比于NIO,则更加进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此AIO是不会阻塞的,此时我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。也就是相当于
在AIO中,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。
主要在Java.nio.channels包下增加了下面四个异步通道:
AsynchronousSocketChannel
AsynchronousServerSocketChannel
AsynchronousFileChannel
AsynchronousDatagramChannel
在AIO socket编程中,服务端通道是AsynchronousServerSocketChannel,这个类提供了一个open()静态工厂,一个bind()方法用于绑定服务端IP地址(还有端口号),另外还提供了accept()用于接收用户连接请求。在客户端使用的通道是AsynchronousSocketChannel,这个通道处理提供open静态工厂方法外,还提供了read和write方法。
在AIO编程中,发出一个事件(accept read write等)之后要指定事件处理类(回调函数),AIO中的事件处理类是CompletionHandler<V,A>,这个接口定义了如下两个方法,分别在异步操作成功和失败时被回调。
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
3. AIO与NIO的实际区别
在JAVA NIO框架中,我们说到了一个重要概念“selector”(选择器)。它负责代替应用查询中所有已注册的通道到操作系统中进行IO事件轮询、管理当前注册的通道集合,定位发生事件的通道等操操作。
但是在JAVA AIO框架中,由于应用程序不是“轮询”方式,而是订阅-通知方式,所以不再需要“selector”(选择器)了,改由channel通道直接到操作系统注册监听。异步IO则采用“订阅-通知”模式:即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数。这就使得AIO真正意义上实现了异步阻塞模式。(AIO是依赖于操作系统的实现的)
和同步IO一样,异步IO也是由操作系统进行支持的。微软的windows系统提供了一种异步IO技术:IOCP(I/O CompletionPort,I/O完成端口);
Linux下由于没有这种异步IO技术,所以使用的是epoll(类似于Selector的一种多路复用IO技术的实现)对异步IO进行模拟。
1. java.nio.channels.AsynchronousChannel:这是一个接口,用来标记一个channel支持异步IO操作。有主要的三个子类AsynchronousFileChannel、AsynchronousSocketChannel和AsynchronousServerSocketChannel,分别对应FileChannel、SocketChannel以及ServerSocketChannel。(很奇怪为什么没有AsynchronousDatagramChannel)
2. AsynchronousChannelGroup:异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行三个任务:等待IO事件、处理IO数据和派发CompletionHandler。AsynchronousServerSocketChannel创建的时候可以传入一个AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的 AsynchronousSocketChannel将同属于一个组,共享资源,(可以理解为相当于Selector)。AsynchronousChannelGroup需要绑定线程池来创建,通过三个静态方法来创建,可以需要根据具体应用相应调整。
public abstract class AsynchronousChannelGroup { public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory); public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,int initialSize); public static AsynchronousChannelGroup withThreadPool(ExecutorService executor); }
3. CompletionHandler:异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果,返回的Future模式或者注册CompletionHandler,常用CompletionHandler的方式,这些handler的调用是由AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素。
CompletionHandler接口有两个个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
4. ByteBuffer:负责承载通信过程中需要读、写的消息。
使用方式主要为三步:打开通道、绑定监听端口、接收客户端连接请求。
1. 打开(创建)通道
可以通过调用AsynchronousServerSocketChannel的静态方法open()来创建AsynchronousServerSocketChannel实例
try { AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(); }catch (IOException e) { e.printStackTrace(); }
或者在open()方法传入AsynchronousChannelGroup参数,设置通道分组,以实现组内通道资源共享。如果通道打开失败,就会抛出IOException
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); }catch (IOException e) { e.printStackTrace(); }
AsynchronousChannelGroup封装了处理由绑定到组的异步通道所触发的I/O操作完成所需的机制。每个AsynchronousChannelGroup关联了一个被用于提交处理I/O事件和分发消费在组内通道上执行的异步操作结果的completion-handlers的线程池。除了处理I/O事件,该线程池还有可能处理其他一些用于支持完成异步I/O操作的任务。从上面例子可以看到,通过指定AsynchronousChannelGroup的方式打开AsynchronousServerSocketChannel,可以定制server channel执行的线程池。如果不指定AsynchronousChannelGroup,则AsynchronousServerSocketChannel会归类到一个默认的分组中。
2. 绑定监听端口和地址
通过调用bind()方法来绑定要监听的端口。
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); int port = 8888; serverSocket.bind(new InetSocketAddress(port)); }catch (IOException e) { e.printStackTrace(); }
3. 监听和接收客户端连接请求
监听客户端连接请求,主要通过调用accept()方法完成。accept()有两个重载方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>); public abstract Future<AsynchronousSocketChannel> accept();
这两个重载方法的行为方式完全相同,提供CompletionHandler回调参数或者返回一个Future<T>类型变量。
Future版本的accept方法通过Future接口可以调用Future.get()方法阻塞等待调用结果,返回一个AsynchronousSocketChannel对象。
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 1024); AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group); int port = 8888; serverSocket.bind(new InetSocketAddress(port)); while(true) { Future<AsynchronousSocketChannel> accept = serverSocket.accept(); AsynchronousSocketChannel socket = accept.get();//阻塞方法,获取AsynchronousSocketChannel //通过获取的Socket来进行网络IO操作 //但一般不这样使用,因为这样就会导致变得和第一代NIO一样了,所以基本都是使用另一种CompletionHandler的重载方法 } }catch (IOException e1) { e1.printStackTrace(); }catch (InterruptedException e2) { e2.printStackTrace(); } catch (ExecutionException e3) { e3.printStackTrace(); }
而CompletionHandler回调参数版本则相反,真正的数据IO处理并不会放在当前线程中,而是通过一个回调方法处理,处理逻辑代码就写在CompletionHandler中的completed方法中,因为该方法会在AsynchronousServerSocketChannel成功接收到一个AsynchronousSocketChannel,回调执行,而如果AsynchronousServerSocketChannel接受AsynchronousSocketChannel失败,就会回调failed方法。
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客户端连接,此时本次accept已经完成 // 继续监听下一个客户端连接到来 serverSocketChannel.accept(serverSocketChannel,this); // result即和该客户端的连接会话 // 此时可以通过result与客户端进行交互 } ... });
为什么会在completed方法中调用accept方法:因为当一个新的客户端建立连接之后,就会回调completed方法,一个AsynchronousServerSocketChannel会与多个客户端建立连接,此时就需要继续调用accept方法来接受更多的客户端连接。
4. 设置TCP连接属性:通过一个AsynchronousServerSocketChannel建立的连接肯定是TCP连接了,所以通过该对象我们可以设置TCP连接的一些属性。
// 设置socket选项,比如设置保持TCP连接,也就是TCP长连接 serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 获取socket选项设置 boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
获取本地IP地址
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
1. 创建连接
首先需要调用open方法创建一个AsynchronousSocketChannel对象,然后通过connect方法与服务端建立连接。connect方法也有两个重载版本
一个版本是返回Future对象,另一种是传入CompletionHandler参数对象
AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); // Future future = socket.connect(new InetSocketAddress("localhost",8888)); // future.get(); socket.connect(new InetSocketAddress("localhost", 8888), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { @Override public void completed(Void result, AsynchronousSocketChannel attachment) { } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { } });
2. 写数据
构建一个ByteBuffer
对象并调用socketChannel.write(ByteBuffer)
方法异步发送消息,并通过CompletionHandler
回调接收处理发送结果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes()); socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 发送完成,result:总共写入的字节数 } @Override public void failed(final Throwable exc, final Object attachment) { // 发送失败 } });
3. 读数据
构建一个指定接收长度的ByteBuffer
用于接收数据,调用socketChannel.read()
方法读取消息并通过CompletionHandler
处理读取结果:
ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 读取完成,result:实际读取的字节数。如果通道中没有数据可读则result=-1。 } @Override public void failed(final Throwable exc, final Object attachment) { // 读取失败 } });
4. 通过AsynchronousSocketChannel也可以设置设置/获取socket选项(TCP连接属性)
// 设置socket选项 socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 获取socket选项设置 boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
1. AIO中定义的异步通道允许指定一个CompletionHandler处理器消费一个异步操作的结果(也就是当准备好IO数据通道后,就回调CompletionHandler中的方法,使用IO数据通道进行IO处理,这也就导致了异步操作,不在等候IO通道的就绪,也不用将IO操作在当前线程中执行,而是采用回调的方式)。从上文中也可以看到,AIO中大部分的异步I/O操作接口都封装了一个带CompletionHandler类型参数的重载方法,使用CompletionHandler可以很方便地处理AIO中的异步I/O操作结果。CompletionHandler是一个具有两个泛型类型参数的接口,声明了两个接口方法:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
NIO以及AIOU虽然实现了异步非阻塞网络IO操作,但是,其依旧具有一些缺点:
虽然JAVA NIO 和 JAVA AIO框架提供了多路复用IO/异步IO的支持,但是并没有提供上层“信息格式”的良好封装。例如前两者并没有提供针对 ProtocolBuffer、JSON这些信息格式的封装,但是Netty框架提供了这些数据格式封装(基于责任链模式的编码和解码功能)
要编写一个可靠的、易维护的、高性能的(注意它们的排序)NIO/AIO服务器应用。除了框架本身要兼容实现各类操作系统的实现外。更重要的是它应该还要处理很多上层特有服务,例如:客户端的权限、还有上面提到的信息格式封装、简单的数据读取。这些Netty框架都提供了响应的支持。
JAVA NIO框架存在一个poll/epoll bug:Selector doesn’t block on Selector.select(timeout),不能block意味着CPU的使用率会变成100%(这是底层JNI的问题,上层要处理这个异常实际上也好办)。当然这个bug只有在Linux内核上才能重现。这个问题在JDK 1.7版本中还没有被完全解决:http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719。虽然Netty 4.0中也是基于JAVA NIO框架进行封装的(上文中已经给出了Netty中NioServerSocketChannel类的介绍),但是Netty已经将这个bug进行了处理。
到此,相信大家对“AIO与NIO的实际区别是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。