用java带你了解网络IO模型

1.BIO

1.1 简述

BIO是同步阻塞IO,所有连接都是同步执行的,在上一个连接未处理完的时候是无法接收下一个连接

image-20230515153842534

1.2 代码示例

在上述代码中,如果启动一个客户端起连接服务端时如果没有发送数据,那么下一个连接将永远无法进来

public static void main(String[] args) {
    try {
        // 监听端口
        ServerSocket serverSocket = new ServerSocket(8080);
        // 等待客户端的连接过来,如果没有连接过来,就会阻塞
        while (true) {
            // 阻塞IO中一个线程只能处理一个连接
            Socket socket = serverSocket.accept();
            System.out.println("客户端建立连接:"+socket.getPort());
            String line = null;
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                line = bufferedReader.readLine();
                System.out.println("客户端的数据:" + line);

                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                bufferedWriter.write("okn");
                bufferedWriter.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.3优点和缺点

优点:

  1. 简单易用,代码实现比较简单。

  2. 对于低并发量的场景,因为每个连接都有独占的线程处理IO操作,因此可以保证每个连接的IO操作都能够及时得到处理。

  3. 对于数据量较小的IO操作,同步阻塞IO模型的性能表现较好。

缺点:

  1. 由于每一个客户端连接都需要开启一个线程,因此无法承载高并发的场景。

  2. 线程切换的开销比较大,会导致系统性能下降。

  3. 对于IO操作较慢的情况下,会占用大量的线程资源,导致系统负载过高。

  4. 对于处理大量连接的服务器,BIO模型的性能较低,无法满足需求。

1.4 思考

问:既然每个连接进来都会阻塞,那么是否可以使用多线程的方式接收处理?

答:当然可以,但是这样如果有1w个连接那么就要启动1w个线程去处理吗,线程是非常宝贵的资源,频繁使用线程对系统的开销是非常大的

2. NoBlockingIO

2.1 简述

NoBlockingIO是同步非阻塞IO,相对比阻塞IO,他在接收数据的时候是非阻塞的,会一直轮询去问内核是否准备好数据,直到有数据返回

ps: NoBlockingIO并不是真正意义上的NIO

image-20230515161159720

2.2 代码示例

在下述代码中,将BIO中的ServerSocket修改为ServerSocketChannel,然后configureBlocking为false则为非阻塞,从而数据都是在channel的buffer(缓冲区)中获取,不理解没关系,就当作是设置非阻塞IO的方式就好

此时在accept中是非阻塞的,不断的等待客户端进来

注意

  1. accept是非阻塞,不断轮询,如果为空则跳过,不为空则添加连接
  2. 读数据是非阻塞,不断的轮询连接,等待客户端写入数据
public static List<SocketChannel> channelList = new ArrayList<>();

public static void main(String[] args) {
    try {
        // 相当于serverSocket
        // 1.支持非阻塞  2.数据总是写入buffer,读取也是从buffer中去读  3.可以同时读写
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置非阻塞
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        while (true){
            // 这里将不再阻塞
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(socketChannel != null){
                socketChannel.configureBlocking(false);
                channelList.add(socketChannel);
            }else {
                System.out.println("没有请求过来!!!");
            }
            for (SocketChannel client : channelList){
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // 也不阻塞
                int num = client.read(byteBuffer);
                if(num>0){
                    System.out.println("客户端端口:"+ client.socket().getPort()+",客户端收据:"+new String(byteBuffer.array()));
                }else {
                    System.out.println("等待客户端写数据");
                }
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

2.3 优点和缺点

优点:

  1. 非阻塞I/O可以同时处理多个客户端连接,提高服务器的并发处理能力。

  2. 由于非阻塞I/O的模式下,一个线程可以处理多个I/O操作,因此可以减少线程切换次数,提高系统性能

缺点:

  1. 有很多无效访问,因为没有连接的时候accept也不会阻塞,很多为空的accpet
  2. 如果客户端没有写数据,会一直向内核访问,每次都是一个系统调用,非常浪费系统资源

2.4 思考

问 :既然一直轮询会产生很多的无效轮询,并浪费系统资源,那么有没有更好的办法呢

答: 通过事件注册的方式(多路复用器)

3. NIO(NewIO)

3.1 简述

NewIO才是真正意义上的NIO,NoBlockingIO只能算是NIO的前身,因为NewIO在NoBlockingIO上加上了多路复用器,使得NIO更加完美

在下图中,channel不再是直接循环调用内核,而是将连接,接收,读取,写入等事件注册到多路复用器中,如果没有事件到来将会阻塞等待

image-20230516095643939

NIO三件套(记):

  1. channel: 介于字节缓冲区(buffer)和套接字(socket)之间,可以同时读写,支持异步IO
  2. buffer: 字节缓冲区,是应用程序和通道之间进行IO数据传输的中转
  3. selector:多路复用器,监听服务端和客户端的管道上注册的事件

3.2 代码示例

从代码示例可以看到,在没有连接的时候会在selector.select()中阻塞,然后等待客户端连接或者写入数据,不同的监听事件会有不同的处理方法

具体流程:

  1. 服务端创建Selector,并注册OP_ACCEPT接受连接事件,然后调用select阻塞等待连接进来

  2. 客户端注册OP_CONNECT事件,表示连接客户端,连接成功后会调用handlerConnect方法

    2.1 handlerConnect方法会注册OP_READ事件并向服务端写数据

  3. 这时候服务端会收到OP_ACCEPT后就会走到handlerAccept方法,表示接受连接

    3.1handlerAccept方法也会注册一个OP_READ事件并向客户端写数据

  4. 客户端接收到服务端的数据后会再次唤醒select方法,然后判断为isReadable(读事件,服务端写入给客户端,那么客户端就是读),handlerRead方法将会把服务端写入的数据读取

  5. 反之亦然,服务端也会收到客户端写入的数据,然后通过读事件将数据读取

服务端代码

public class NewIOServer {
    static Selector selector;
    public static void main(String[] args) {

        try {
            selector = Selector.open();

            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));

            // 需要把serverSocketChannel注册到多路复用器上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                // 阻塞
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isAcceptable()) {
                        handlerAccept(key);
                    } else if (key.isReadable()) {
                        handlerRead(key);
                    }else if(key.isWritable()){

                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void handlerRead(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try {
            socketChannel.read(allocate);
            System.out.println("server msg:" + new String(allocate.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handlerAccept(SelectionKey key) {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        // 不阻塞
        try {
            SocketChannel socketChannel = serverSocketChannel.accept();
            socketChannel.configureBlocking(false);
            socketChannel.write(ByteBuffer.wrap("It‘s server msg".getBytes()));
            // 读取客户端的数据
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

客户端代码

public class NewIOClient {

    static Selector selector;

    public static void main(String[] args) {
        try {
            selector = Selector.open();
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 8080));

            // 需要把socketChannel注册到多路复用器上
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                // 阻塞
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()) {
                        handlerConnect(key);
                    } else if (key.isReadable()) {
                        handlerRead(key);
                    } else if (key.isWritable()) {

                    }
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handlerRead(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try {
            socketChannel.read(allocate);
            System.out.println("client msg:" + new String(allocate.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static void handlerConnect(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("it‘s client msg".getBytes()));
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
}

3.3 优点和缺点

优点:

  1. NIO使用了非阻塞IO,可以大大提高系统的吞吐量和并发性能。

  2. NIO提供了可扩展的选择器,可以监控多个通道的状态,从而实现高效的事件驱动模型。

  3. NIO采用直接内存缓冲区,可以避免Java堆内存的GC问题,提高内存管理的效率。

缺点:

  1. NIO的编程模型相比传统的IO模型更加复杂,需要掌握较多的API和概念。
  2. NIO的实现难度较高,需要处理很多细节问题,如缓冲区的管理、选择器的使用等。
  3. NIO的可靠性不如传统的IO模型,容易出现空轮询、系统负载过高等问题。

3.3 思考

问:select方法不是也阻塞吗,那跟BIO有什么区别?

答:虽然他是在select阻塞,但是他通过事件注册的方式,可以将多个selectKey同时加载到selectionKeys集合中,通过for循环处理不同的事件,而BIO只能由一个连接处理完才能处理下一个连接

问:什么是多路复用?

答:

多路:是指多个连接的管道,通道

复用:复用一个系统调用,原本多次系统调用变成一次

4. 扩展select/poll、epoll

4.1 简述

由第三部分的NIO可知,多路复用把for循环的系统调用变成了一次调用,那么他具体是怎么实现的?

其实我们仔细思考一下就能知道,他主要实现就是在selector.select(),由他去阻塞和触发动作。然而在实现这些功能的时候,就用到了三种模型,select、poll、epoll。因为select和poll很相似,所以大家都会把他们归为一类。

4.2 select/poll

我们先来说说什么是select?

实现过程

  1. 每一个socket调用select()方法后,socket的等待队列就会放线程的引用,该线程就是你调用select的那个线程
  2. 当其中一个socket发送数据的时候,他会将每一个socket在等待队列中移除放入就绪队列,这就表明一定有一个客户端写了数据过来,但是注意,这并不表示所有都有客户端写了数据过来
  3. 这时候唤醒主线程,然后去就绪队列中遍历找到客户端写的数据并返回

具体如下图所示:

image-20230519110156726

产生问题

  1. 因为fd(file)是个数组,所以socket容量会有上限
  2. 只要有一个socket写入就会遍历所有socket,虽然减少了空轮询问题,但是每次都要在所有socket中去找到已准备好的那个socket需要消耗性能

什么是poll?

因为fd是个数组,所以容量会达到上限,而poll则将这个数据结构改成了链表,所以解决了select模型中上限的问题,但是遍历socket的问题还是存在

select和poll的本质区别就是一个是用数组存放socket,一个是用链表存放,其他地方没有任何区别

4.3 epoll

epoll和select/poll相比,采用了事件回调的机制,并且使用红黑树去维护注册的socket,如下图所示

image-20230522165104289

实现过程

  1. 调用Selector.open的时候会创建一个eventpoll的文件,里面主要含有等待队列,rbr(红黑树),就绪列表
  2. 然后在建立连接的时候调用epoll_ctl函数将socket放入epitem中
  3. 调用epoll_wait函数将线程放入等待队列中,等待数据过来时唤醒
  4. 有数据写入的时候会触发epitem的回调方法,将该epitem移除并加入rdlist就绪列表中
  5. 当有数据在就绪列表的时候,就会唤醒等待对列中的线程并处理数据

这样通过红黑树来维护连接和通过就绪列表来处理数据就可以保证可以存放最大限度的socket数量,并且在唤醒线程处理去处理就绪列表的时候肯定都是需要处理并且已就绪的socket。完美的解决了select/poll中的问题

总结

epoll相较于select/poll的优势:

  1. 采用了事件驱动的方式,可以处理大量的连接,效率更高。

  2. 支持边缘触发(ET)和水平触发(LT)两种模式,可以更灵活地处理IO事件。

  3. 记录了上次处理的位置,可以避免重复的遍历,更加高效。

  4. 高效利用了内核空间和用户空间的交互,避免了复制文件描述符。

4.4 扩展话题

对于epoll的一些扩展,有兴趣的可以了解下,不感兴趣可以略过

4.4.1 什么是ET和LT?

ET和LT是epoll工作模式中的两种触发方式,分别表示边缘触发(Edge Triggered)和水平触发(Level Triggered)。

  1. 边缘触发(ET)

    在ET模式下,当一个文件描述符上出现事件时,epoll_wait函数只会通知一次,即只有在文件描述符状态发生变化时才会返回。如果应用程序没有处理完这个事件,那么下一次调用epoll_wait函数时,它不会再返回这个事件,直到下一次状态变化。

    ET模式下的事件处理更为高效,因为它只会在必要的时候通知应用程序,避免了重复通知的问题。但是,由于ET模式只在状态变化时通知一次,因此应用程序需要及时处理事件,否则可能会错过某些事件。

  2. 水平触发(LT)

    在LT模式下,当一个文件描述符上出现事件时,epoll_wait函数会重复通知应用程序,直到该文件描述符上的事件被处理完毕为止。如果应用程序没有处理完这个事件,那么下一次调用epoll_wait函数时,它会再次返回这个事件,直到应用程序处理完为止。

    LT模式下的事件处理比较简单,因为它会重复通知应用程序,直到应用程序处理完为止。但是,由于重复通知的问题,LT模式下可能会导致一些性能问题。同时,在LT模式下,应用程序需要及时处理事件,否则可能会导致文件描述符上的事件积压,影响系统的性能。

4.4.2 什么是惊群?

epoll的惊群(Thundering Herd)指的是多个线程或进程同时等待同一个epoll文件描述符上的事件,当文件描述符上出现事件时,内核会通知所有等待的线程或进程,但只有一个线程或进程能够真正处理该事件,其他线程或进程会被唤醒但不能处理该事件,从而造成资源浪费和性能降低的问题。

惊群问题是由于内核通知等待线程或进程的方式引起的。在epoll中,当文件描述符上出现事件时,内核会通知所有等待的线程或进程,而不是通知一个线程或进程。因此,如果有多个线程或进程等待同一个文件描述符,那么当该文件描述符上出现事件时,内核会通知所有等待的线程或进程,导致惊群问题。

为了解决惊群问题,可以采用以下两种方式:

  1. 使用边缘触发(ET)模式:在ET模式下,当文件描述符上出现事件时,内核只会通知一个等待的线程或进程,从而避免了惊群问题。

  2. 采用互斥量或条件变量等机制:在多个线程或进程等待同一个文件描述符时,可以使用互斥量或条件变量等机制来控制线程或进程的唤醒,从而避免惊群问题。

5. AIO

5.1简述

在上面将的BIO,NIO中都是同步IO,BIO叫做同步阻塞,NIO叫做同步非阻塞,那么AIO则是异步IO,全名(Asynchronous I/O)

如下图所示,异步IO和多路复用机制,最大的区别在于:当数据就绪后,客户端不需要发送内核指令从内核空间读取

数据,而是系统会异步把这个数据直接拷贝到用户空间,应用程序只需要直接使用该数据即可。

image-20230522171957211

5.2 代码示例

从代码示例可以看到,以下代码都是基于回调机制实现的,并不会像BIO和NIO一样使用轮询的方式,他不需要像同步IO一样需要查找就绪socket,只要客户端有数据写入就会回调给服务端,既然是异步的所以就不会存在阻塞

服务端代码

public class AIOServer {

    public static void main(String[] args) throws Exception {
        // 创建一个SocketChannel并绑定了8080端口
        final AsynchronousServerSocketChannel serverChannel =
                AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));

        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                try {
                    // 打印线程的名字
                    System.out.println("2--"+Thread.currentThread().getName());
                    System.out.println(socketChannel.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // socketChannel异步的读取数据到buffer中
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            // 打印线程的名字
                            System.out.println("3--"+Thread.currentThread().getName());
                            buffer.flip();
                            System.out.println(new String(buffer.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("HelloClient".getBytes()));
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer buffer) {
                            exc.printStackTrace();
                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        System.out.println("1--"+Thread.currentThread().getName());
        Thread.sleep(Integer.MAX_VALUE);
    }
}

客户端代码

public class AIOClient {

    private final AsynchronousSocketChannel client;

    public AIOClient() throws IOException {
        client = AsynchronousSocketChannel.open();
    }

    public static void main(String[] args) throws Exception {
        new AIOClient().connect("localhost",8080);
    }

    public void connect(String host, int port) throws Exception {
        // 客户端向服务端发起连接
        client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                try {
                    client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                    System.out.println("已发送到服务端");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        final ByteBuffer bb = ByteBuffer.allocate(1024);
        // 客户端接收服务端的数据,获取的数据写入到bb中
        client.read(bb, null, new CompletionHandler<Integer, Object>() {
            @Override
            public void completed(Integer result, Object attachment) {
                // 服务端返回数据的长度result
                System.out.println("I/O操作完成:" + result);
                System.out.println("获取反馈结果:" + new String(bb.array()));
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5.3 优点和缺点

优势:

  1. 更加高效:AIO采用回调方式,可以避免轮询等操作对CPU的占用,减少CPU的负担,从而提高了系统的性能。

  2. 可以更好地利用系统资源:AIO能够在I/O操作完成之前把线程释放出来,可以更好地利用系统资源,提高系统的并发处理能力。

  3. 适用于高并发场景:AIO适用于高并发场景,能够支持大量的并发连接,提高系统的处理能力。

缺点:

  1. 学习成本高:相比于NIO,AIO的编程模型更加复杂,需要学习更多的知识,学习成本更高。

  2. 实现难度大:AIO的实现难度比较大,需要对操作系统的底层机制有深入的了解,因此开发成本较高。

  3. 并非所有操作系统都支持:AIO并非所有操作系统都支持,只有Linux 2.6以上的内核才支持AIO,因此跨平台的支持较差。

ps: 说白了AIO很好用,但是太复杂

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>