BIO、NIO、AIO

懵逼了 N 回的 BIO 和 NIO,重新认识一下啊。

BIO 同步并阻塞

我们先自己实现一个简单的单线程服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//服务端
public class Server {
public static void main(String[] args) {
byte[] buffer = new byte[1024];
try {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("服务器已启动并监听8080端口");
while (true) {
System.out.println();
System.out.println("服务器正在等待连接...");
Socket socket = serverSocket.accept();
System.out.println("服务器已接收到连接请求...");
System.out.println();
System.out.println("服务器正在等待数据...");
socket.getInputStream().read(buffer);
System.out.println("服务器已经接收到数据");
System.out.println();
String content = new String(buffer);
System.out.println("接收到的数据:" + content);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

//客户端
public class Consumer {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1",8080);
Thread.sleep(5000);
socket.getOutputStream().write("向服务器发数据".getBytes());
socket.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

copy 并运行上述代码,我们看到服务端启动后就会一直循环阻塞线程等待客户端的接入。
观察整个执行流程发现BIO会产生两次阻塞,第一次是在等待连接时阻塞,第二次是在等待客户端数据时阻塞。
单线程的BIO弱点很明显,在不使用多线程的BIO情况下,服务端仅能同时处理一个请求。
为了提高并发,处理更多的用户请求,于是引入多线程处理,每当一个新的请求来到以后创建一个新线程去处理此连接,然后程序继续恢复原来的阻塞待请求状态。下面是一段伪代码(别看很简单,tomcat的BIO实现就是这个原理哦!)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  public class Server {
public static void main(String[] args) {
byte[] buffer = new byte[1024];
try {
ServerSocket serverSocket = new ServerSocket(8080);
System.out.println("服务器已启动并监听8080端口");
while (true) {
System.out.println();
System.out.println("服务器正在等待连接...");
Socket socket = serverSocket.accept();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("服务器已接收到连接请求...");
System.out.println();
System.out.println("服务器正在等待数据...");
try {
socket.getInputStream().read(buffer);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("服务器已经接收到数据");
System.out.println();
String content = new String(buffer);
System.out.println("接收到的数据:" + content);
}
}).start();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

思考一下上面的代码,上面多线程处理虽然解决了单线程无法并发处理的问题,但是还有一个问题。如果有请求连接到服务器但没有及时发送来数据,甚至说不打算发送数据,这种情况下程序还是会创建一个线程去处理这个请求。如果请求不多服务器此时处理还是正常的,但是请求过多后就会对服务器造成较大压力。为了解决这个问题NIO应运而生了。

NIO 同步非阻塞

分析了BIO,我们知道了NIO需要解决的问题;

  • 等待连接时和等待数据时的阻塞

  • 大量无效请求占用线程

    模拟NIO解决方案
    1、我们可以在请求进入后查询是否有数据到来进行判断,减少阻塞。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
      public class Server {
    public static void main(String[] args) throws InterruptedException {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    try {
    //Java为非阻塞设置的类
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(8080));
    //设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    while(true) {
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel==null) {
    //表示没人连接
    System.out.println("正在等待客户端请求连接...");
    Thread.sleep(5000);
    }else {
    System.out.println("当前接收到客户端请求连接...");
    }
    if(socketChannel!=null) {
    //设置为非阻塞
    socketChannel.configureBlocking(false);
    byteBuffer.flip();//切换模式 写-->读
    int effective = socketChannel.read(byteBuffer);
    if(effective!=0) {
    String content = Charset.forName("utf-8").decode(byteBuffer).toString();
    System.out.println(content);
    }else {
    System.out.println("当前未收到客户端消息");
    }
    }
    }
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }

    尽管这样还是有些问题,可能客户端没来得及发送消息服务端程序就执行完成了,并没有等到数据读取,也无法再复盘重新读取数据。

    2、这次我们把连接保存起来,方便在没有及时收到消息时还能再次读取。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
      public class Server {
    public static void main(String[] args) throws InterruptedException {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    List<SocketChannel> socketList = new ArrayList<SocketChannel>();
    try {
    //Java为非阻塞设置的类
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(8080));
    //设置为非阻塞
    serverSocketChannel.configureBlocking(false);
    while(true) {
    SocketChannel socketChannel = serverSocketChannel.accept();
    if(socketChannel==null) {
    //表示没人连接
    System.out.println("正在等待客户端请求连接...");
    Thread.sleep(5000);
    }else {
    System.out.println("当前接收到客户端请求连接...");
    socketList.add(socketChannel);
    }
    for(SocketChannel socket:socketList) {
    socket.configureBlocking(false);
    int effective = socket.read(byteBuffer);
    if(effective!=0) {
    byteBuffer.flip();//切换模式 写-->读
    String content = Charset.forName("UTF-8").decode(byteBuffer).toString();
    System.out.println("接收到消息:"+content);
    byteBuffer.clear();
    }else {
    System.out.println("当前未收到客户端消息");
    }
    }
    }
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    }

    代码解析

    在解决方案一中,我们采用了非阻塞方式,但是发现一旦非阻塞,等待客户端发送消息时就不会再阻塞了,而是直接重新去获取新客户端的连接请求,这就会造成客户端连接丢失,而在解决方案二中,我们将连接存储在一个list集合中,每次等待客户端消息时都去轮询,看看消息是否准备好,如果准备好则直接打印消息。

    可以看到,从头到尾我们一直没有开启第二个线程,而是一直采用单线程来处理多个客户端的连接,这样的一个模式可以很完美地解决BIO在单线程模式下无法处理多客户端请求的问题,并且解决了非阻塞状态下连接丢失的问题。

    存在的问题(解决方案二)
    从刚才的运行结果中其实可以看出,消息没有丢失,程序也没有阻塞。但是,在接收消息的方式上可能有些许不妥,我们采用了一个轮询的方式来接收消息,每次都轮询所有的连接,看消息是否准备好,测试用例中只是三个连接,所以看不出什么问题来,但是我们假设有1000万连接,甚至更多,采用这种轮询的方式效率是极低的。

    另外,1000万连接中,我们可能只会有100万会有消息,剩下的900万并不会发送任何消息,那么这些连接程序依旧要每次都去轮询,这显然是不合适的。

    Java中的NIO实现方式为将客户端发送的连接请求都注册到多路复用器上,多路复用器轮询到连接有I/O请求时启动一个线程进行处理。

    真实NIO中如何解决
    在真实NIO中,并不会在Java层上来进行一个轮询,而是将轮询的这个步骤交给我们的操作系统来进行,他将轮询的那部分代码改为操作系统级别的系统调用(windows中为select函数,在linux环境中为epoll),在操作系统级别上调用select函数,主动地去感知有数据的socket。

AIO(NIO2) 异步非阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  //AIO服务端
public class AIOServer {
public final static int PORT = 9888;
private AsynchronousServerSocketChannel server;

public AIOServer() throws IOException {
server = AsynchronousServerSocketChannel.open().bind(
new InetSocketAddress(PORT)
);
}

public void startWithFuture() throws InterruptedException, ExecutionException,
TimeoutException {
System.out.println("Sever listen on " + PORT);
Future<AsynchronousSocketChannel> future = server.accept();
AsynchronousSocketChannel socket = future.get();
ByteBuffer readBuf = ByteBuffer.allocate(1024);
readBuf.clear();
socket.read(readBuf).get(100, TimeUnit.SECONDS);
readBuf.flip();
System.out.println("received message:" + new String(readBuf.array()));
System.out.println(Thread.currentThread().getName());
}

public void startWithCompletionHandler() throws InterruptedException, ExecutionException,
TimeoutException {
System.out.println("Server listen on " + PORT);
//注册事件和事件完成后的处理器
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer buffer = ByteBuffer.allocate(1024);

public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println(Thread.currentThread().getName());
System.out.println("start");
try {
buffer.clear();
result.read(buffer).get(100, TimeUnit.SECONDS);
buffer.flip();
System.out.println("received message: " + new String(buffer.array()));
} catch(InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
try{
result.close();
server.accept(null, this);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println("end");
}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("failed: " + exc);
}
});

// 主线程继续自己的行为
while(true) {
System.out.println("main thread");
Thread.sleep(1000);
}
}

public static void main(String[] args) throws Exception {
new AIOServer().startWithCompletionHandler();
}
}

//客户端
public class AIOClient{

public static void main(String[] args) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
client.connect(new InetSocketAddress("localhost", 9888));
client.write(ByteBuffer.wrap("just a test".getBytes())).get();
}

}

AIO相对NIO来说异步能力是很重要的一个特点,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,AIO其实是一种在读写操作结束之前允许进行其他操作的I/O处理。

一般的I/O模型

  • 同步阻塞:在这种方式下,用户发起I/O操作后,必须等待IO操作完成后用户线程才能继续运行。最基本的 BIO 属于此种模式。
  • 同步非阻塞:在此种模式下,用户发起I/O请求后,可以继续做其他事情,等隔一段时间用户去询问是否操作完成即可,可以减少CPU浪费。NIO 属于此模式,将多个 Channel 注册到 Selectors 然后定时去询问是否完成。
  • 异步阻塞:此种方式下是指应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!
  • 异步非阻塞:在此种模式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行等待实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前 AIO 就是基于此模型。