
4.3 服务器编程范例
本节介绍如何用java.nio包中的类来创建服务器EchoServer,本节提供了3种实现方式.
·4.3.1节的例程4-2:采用阻塞模式,用线程池中的工作线程处理每个客户连接。
·4.3.2节的例程4-3:采用非阻塞模式,单个线程同时负责接收多个客户连接,以及与多个客户交换数据的任务。
·4.3.3节的例程4-4:由一个线程负责接收多个客户连接,采用阻塞模式;由另一个线程负责与多个客户交换数据,采用非阻塞模式。
4.3.1 创建阻塞的EchoServer
当ServerSocketChannel与SocketChannel采用默认的阻塞模式时,为了同时处理多个客户的连接,必须使用多个线程。在例程4-2的block.EchoServer类中,利用java.util.concurrent包中提供的线程池ExecutorService来处理与客户的连接。
例程4-2 EchoServer.java(阻塞模式)



EchoServer类的构造方法负责创建线程池,启动服务器,把它绑定到一个本地端口。EchoServer类的service()方法负责接收客户的连接。每接收一个客户连接,就把它交给线程池来处理,线程池取出一个空闲的线程,来执行Handler对象的run()方法。Handler类的handle()方法负责与客户通信。该方法先获得与SocketChannel关联的Socket对象,然后从Socket对象中得到输入流与输出流,再接收和发送数据。
SocketChannel实际上也提供了read(ByteBuffer buffer),但是通过它来读取一行字符串比较麻烦。以下readLine()方法就通过SocketChannel的read(ByteBuffer buffer)方法来读取一行字符串,它的作用与BufferedReader的readLine()方法是等价的。


从以上程序代码可以看出,用SocketChannel来读取一行一行的字符串很麻烦,需要操纵ByteBuffer缓冲区,而且需要处理字节流与字符串之间的转换。这比使用输入流和输出流来处理一行一行的字符串麻烦多了。
本书配套源代码包的sourcecode/chapter04/src/block/EchoServer1.java演示用上述readLine()方法来读取一行字符串,本书不再列出完整代码。
4.3.2 创建非阻塞的EchoServer
在非阻塞模式下,EchoServer只需要启动一个主线程,就能同时处理3件事。
(1)接收客户的连接。
(2)接收客户发送的数据。
(3)向客户发回响应数据。
EchoServer委托Selector来负责监控接收连接就绪事件、读就绪事件和写就绪事件,如果有特定事件发生,就处理该事件。
EchoServer类的构造方法负责启动服务器,把它绑定到一个本地端口,代码如下。

EchoServer类的service()方法负责处理本节开头所说的3件事,体现其主要流程的代码如下:


在service()方法中,首先由ServerSocketChannel向Selector注册接收连接就绪事件。如果Selector监控到该事件发生,就会把相应的SelectionKey对象加入selected-keys集合中。service()方法接下来在第1层while循环中不断询问Selector已经发生的事件,然后依次处理每个事件。
Selector的select()方法返回当前相关事件已经发生的SelectionKey的个数。如果当前没有任何事件发生,select()方法就会阻塞下去,直到至少有一个事件发生。Selector的selectedKeys()方法返回selected-keys集合,它存放了相关事件已经发生的SelectionKey对象。
service()方法在第2层while循环中,从selected-keys集合中依次取出每个SelectionKey对象,把它从selected-keys集合中删除,然后调用isAcceptable()、isReadable()和isWritable()方法判断到底是哪种事件发生了,从而做出相应的处理。处理每个SelectionKey的代码放在一个try语句中,如果出现异常,就会在catch语句中使这个SelectionKey失效,并且关闭与之关联的Channel。
1.处理接收连接就绪事件
service()方法中处理接收连接就绪事件的代码如下。


如果SelectionKey的isAcceptable()方法返回true,就意味着这个SelectionKey所感兴趣的接收连接就绪事件已经发生了。service()方法首先通过SelectionKey的channel()方法获得与之关联的ServerSocketChannel对象,然后调用ServerSocketChannel的accept()方法获得与客户连接的SocketChannel对象。这个SocketChannel对象在默认情况下处于阻塞模式。如果希望它执行非阻塞的I/O操作,就需要调用它的configureBlocking(false)方法。SocketChannel调用Selector的register()方法来注册读就绪事件和写就绪事件,还向register()方法传递了一个ByteBuffer类型的参数,这个ByteBuffer将作为附件与新建的SelectionKey对象关联。本节稍后会介绍这个ByteBuffer的作用。
2.处理读就绪事件
如果SelectionKey的isReadable()方法返回true,就意味着这个SelectionKey所感兴趣的读就绪事件已经发生了。EchoServer类的receive()方法负责处理这一事件。

在receive()方法中,先获得与这个SelectionKey关联的ByteBuffer和SocketChannel。SocketChannel每次读到的数据都被添加到这个ByteBuffer,在程序中,由buffer变量引用这个ByteBuffer对象。在非阻塞模式下,socketChannel.read(readBuff)方法读到多少数据是不确定的,假定读到的字节数为n,那么0<=n<readBuff的容量。EchoServer要求每接收到客户的一行字符串XXX(也就是字符串以“\r\n”结尾),都返回字符串echo:XXX。由于无法保证socketChannel.read(readBuff)方法一次读入一行字符串,因此只好把它每次读入的数据都放到buffer中,当这个buffer中凑足了一行字符串,再把它发送给客户。
receive()方法的许多代码都涉及对ByteBuffer的3个属性(position、limit和capacity)的操作,图4-14演示了以上readBuff和buffer变量的3个属性的变化过程。假定SocketChannel的read()方法读入了6字节,把它存放在readBuff中,并假定buffer中原来有10字节,buffer.put(readBuff)方法把readBuff中的6字节拷贝到buffer中,buffer中最后有16字节。

图4-14 receive()方法操纵readBuff和buffer的过程
3.处理写就绪事件
如果SelectionKey的isWritable()方法返回true,就意味着这个SelectionKey所感兴趣的写就绪事件已经发生了。EchoServer类的send()方法负责处理这一事件。


EchoServer的receive()方法把读入的数据都放到一个ByteBuffer中,send()方法就从这个ByteBuffer中取出数据。如果ByteBuffer中还没有一行字符串,就什么也不做,直接退出send()方法;否则,就从ByteBuffer中取出一行字符串XXX,然后向客户发送echo:XXX。接着,send()方法把ByteBuffer中的字符串XXX删除。如果send()方法处理的字符串为“bye\r\n”,就使SelectionKey失效,并关闭SocketChannel,从而断开与客户的连接。
提示
EchoServer的receive()方法和send()方法都操纵同一个ByteBuffer,receive()方法向ByteBuffer中添加数据,而send()方法从ByteBuffer中取出数据。ByteBuffer的容量为1024字节。本程序假设这个ByteBuffer的容量足够大,不会发生receive()方法向ByteBuffer中添加数据时,缓冲区溢出的异常。如果要使程序代码更加健壮,就要考虑万一缓冲区中已经填充满了数据,无法再添加更多数据的情况,本书5.2.2节的例程5-3的ChannalIO类提供了解决方案,它实现了容量可增长的缓冲区。
4.编码与解码
在ByteBuffer中存放的是字节,它表示字符串的编码。而程序需要把字节转换为字符串,才能进行字符串操作,比如判断里面是否包含“\r\n”,以及截取子字符串。EchoServer类的实用方法decode()负责解码,也就是把字节序列转换为字符串。


decode()方法中的charset变量是EchoServer类的成员变量,它表示GBK中文编码,它的定义如下。

在send()方法中,当通过SocketChannel的write(ByteBuffer buffer)方法发送数据时,write(ByteBuffer buffer)方法不能直接发送字符串,而只能发送ByteBuffer中的字节。因此程序需要对字符串进行编码,把它们转换为字节序列,放在ByteBuffer中,然后发送。

EchoServer类的实用方法encode()负责编码,也就是把字符串转换为字节序列。

5.在非阻塞模式下确保发送一行数据
在send()方法的outputBuffer中存放了字符串echo:XXX的编码。在非阻塞模式下,SocketChannel.write(outputBuffer)方法并不保证一次就把outputBuffer中的所有字节都发送完,而是奉行能发送多少就发送多少的原则。如果希望把outputBuffer中的所有字节都发送完,就需要采用以下循环。

6.删除ByteBuffer中的已处理数据
与SelectionKey关联的ByteBuffer附件中存放了读操作与写操作的共享数据。receive()方法把读到的数据放入ByteBuffer,而send()方法从ByteBuffer中一行行地取出数据。当send()方法从ByteBuffer中取出一行字符串XXX后,就要把字符串从ByteBuffer中删除。在send()方法中,outputData变量就表示取出的一行字符串XXX,程序先把它编码为字节序列,放在一个名为temp的ByteBuffer中。接着把buffer的位置设为temp的极限,然后调用buffer的compact()方法删除代表字符串XXX的数据。

图4-15演示了以上代码操纵buffer的过程。图中假定temp中有10字节,buffer中本来有16字节,buffer.compact()方法删除缓冲区开头的10字节,最后剩下6字节。

图4-15 从buffer中删除已经处理过的一行字符串XXX
例程4-3是EchoServer的源程序。
例程4-3 EchoServer.java(非阻塞模式)



4.3.3 在EchoServer中混合用阻塞模式与非阻塞模式
在4.3.2节的例程4-3中,EchoServer的ServerSocketChannel以及SocketChannel都被设置为非阻塞模式,这使得接收连接、接收数据和发送数据的操作都采用非阻塞模式,EchoServer采用一个线程同时完成这些操作。假如有许多客户请求连接,可以把接收客户连接的操作单独由一个线程完成,把接收数据和发送数据的操作由另一个线程完成,这可以提高服务器的并发性能。
负责接收客户连接的线程按照阻塞模式工作,如果收到客户连接,就向Selector注册读就绪和写就绪事件,否则进入阻塞状态,直到接收到了客户的连接。负责接收数据和发送数据的线程按照非阻塞模式工作,只有在读就绪或写就绪事件发生时,才执行相应的接收数据和发送数据操作。
例程4-4是EchoServer类的源程序。其中receive()、send()、decode()和encode()方法的代码与4.3.2节的例程4-3的EchoServer类相同,为了节省篇幅,不再重复展示。
例程4-4 EchoServer.java(混合使用阻塞模式与非阻塞模式)


以上EchoServer类的构造方法与4.3.2节的例程4-3的EchoServer类的构造方法基本相同,唯一的区别是,在本例中,ServerSocketChannel采用默认的阻塞模式,即没有调用以下方法。

EchoServer类的accept()方法负责接收客户连接,ServerSocketChannel的accept()方法工作于阻塞模式,如果没有客户连接,就会进入阻塞状态,直到接收到了客户连接。接下来调用socketChannel.configureBlocking(false)方法把SocketChannel设为非阻塞模式,然后向Selector注册读就绪和写就绪事件。
EchoServer类的service()方法负责接收和发送数据,它在一个无限for循环中,不断调用Selector的select()方法查询已经发生的事件,然后做出相应的处理。
在EchoServer类的main()方法中,定义了一个匿名线程(暂且称它为Accept线程),它负责执行EchoServer的accept()方法。执行main()方法的主线程启动了Accept线程后,主线程就开始执行EchoServer的service()方法。因此当EchoServer启动后,共有两个线程在工作,Accept线程负责接收客户连接,主线程负责接收和发送数据。

当Accept线程开始执行以下方法时,

如果主线程正好在执行selector.select()方法,而且处于阻塞状态,那么Accept线程也会进入阻塞状态。两个线程都处于阻塞状态,很有可能导致死锁。导致死锁的具体情形为:Selector中尚没有任何注册的事件,即all-keys集合为空,主线程执行selector.select()方法时将进入阻塞状态,只有当Accept线程向Selector注册了事件,并且该事件发生后,主线程才会从selector.select()方法中返回。假如Selector中尚没有任何注册的事件,此时Accept线程调用socketChannel.register()方法向Selector注册事件,由于主线程正在selector.select()方法中阻塞,这使得Accept线程也在socketChannel.register()方法中阻塞。Accept线程无法向Selector注册事件,而主线程没有任何事件可以监控,所以这两个线程都将永远阻塞下去。
提示
SelectableChannel的register(Selector selector,…)和Selector的select()方法都会操纵Selector对象的共享资源all-keys集合。SelectableChannel以及Selector的实现对操纵共享资源的代码块进行了同步,从而避免对共享资源的竞争。同步机制使得一个线程执行SelectableChannel的register(Selector selector,…)时,不允许另一个线程同时执行Selector的select()方法,反之亦然。
为了避免死锁,程序必须保证当Accept线程正在通过socketChannel.register()方法向Selector注册事件时,不允许主线程正在selector.select()方法中阻塞。
为了协调Accept线程和主线程,EchoServer类在以下代码前加了同步标记。当Accept线程开始执行这段代码时,必须先获得gate对象的同步锁,然后进入同步代码块,先执行Selector对象的wakeup()方法,假如此时主线程正好在执行selector.select()方法,而且处于阻塞状态,那么主线程就会被唤醒,立即退出selector.select()方法。

主线程被唤醒后,在下一次循环中又会执行selector.select()方法,为了保证让Accept线程先执行完socketChannel.register()方法,再让主线程执行selector.select()方法,主线程必须先获得gate对象的同步锁。

假如Accept线程还没有执行完同步代码块,就不会释放gate对象的同步锁,这使得主线程必须等待片刻,等到Accept线程执行完同步代码块,释放了gate对象的同步锁,主线程才能恢复运行,再次执行selector.select()方法。