fagle

[转]高性能聊天系统

0
阅读(1926)
本章以建立一个聊天系统为例,介绍如何使用J2SE 1.4非堵塞I/ONI/O)为核心开发一个高性能的实时互动服务器系统,这个高性能服务器系统可以拓展为更广阔的应用,如游戏系统、社区系统或者数据实时采集系统。

1.1  系统需求

聊天交流是目前互联网提供的主要内容。聊天系统有多种实现方式,类似ICQ属于一种点对点的聊天系统,还有一种是基于Socket的集中式聊天系统,这种聊天系统需要登录统一的聊天服务器,每个人的聊天信息其他人都可以看到,类似一种会议室,当然,两个人之间也可以进行保密的私语。

在基于Socket的聊天系统中,主要有两种角色:服务器和客户端,不同的客户端登录集中式的服务器,通过服务器将一个客户端发出的信息推送到其他所有客户端。

基于Socket的聊天系统最早实现是使用网页刷新方式,通过客户端不断地自动刷新,将服务器端整个页面内容下载到客户端显示,这种方式的聊天速度慢,而且有刷屏现象,很快被更新的聊天技术所替代。

聊天系统在客户端和服务器之间主要传送的是文字信息,服务器端只需要把最新的文字信息推送到客户端,这样减少了网络传输内容,节省了网络传输的时间,无疑提高了聊天速度。这种“推”技术是目前基于Socket聊天系统的主要实现技术。

一个基于Socket的聊天系统有下列具体功能要求:

1)客户端和服务器必须保持随时随地的连接。这有别于普通Web浏览的连接方式。在使用浏览器访问服务器时,先由客户端发出HTTP协议,然后服务器响应处理这个客户端的响应,再返回处理结果;请求(Request)和响应(Response)是一种一对一的前后因果关系。

而在基于Socket的聊天系统中,客户端发出聊天信息的同时,客户端也在接受服务器发送过来的其他人的聊天信息,因此,请求和响应不存在那种前后对应关系,是两种分别独立进行的进程。

因为服务器任何时候都可能发送信息到客户端,因此,客户端和服务器一旦建立连接,必须能让服务器在以后发送中寻找定位到这个连接。

2)在速度性能方面,聊天系统提出了更高的要求。在网络连接的薄弱环节I/O通信方面,要求能够实现无堵塞地、流畅地数据读写。在面对几百个甚至更多的客户端同时发出连接信息的情况下,服务器要求能够保持高性能的并发处理机制,迅速地完成这几百个并发请求的处理和发送任务。

3)在扩展性和伸缩性方面,聊天系统也提出了一定的要求。当一台服务器不能满足要求时,必须在客户端不知晓的情况下,通过不断增加服务器就能方便地拓展聊天系统的整体处理能力。对于客户端用户来说,这些服务器群都象征一个统一的服务器,不需要他们在进入聊天室之前先选择具体的服务器;也没有单个聊天室最大人数的限制,如果可以,服务器群可以支撑一个巨大容量的聊天室。

1.2  架构设计

本系统的设计核心是Socket底层通信,基于快速稳定的Socket底层通信架构,不但可以实现聊天系统,还可以实现其他如游戏、数据采取等实时性要求较高的系统,甚至可以建立一个快速的平台服务器系统。相比J2EE服务器系统,该平台系统的最大优势就是精简的代码带来的高性能。

当然,如果单纯追求高性能和速度,也许直接使用汇编就可以。使用Java设计这样的实时系统,实际还有一种很重要的目的,即追求高度的可扩展性和伸缩性。

因此,本系统设计必须将高性能和高伸缩性两个方面和谐地统一起来,不能盲目追求性能而破坏面向对象的编程风格和模式;也不能因为追求更大的重用性,建立太多复杂的中间层,其实这方面J2EE已经做得很好,有关J2EE的应用将在以后章节重点讨论。

当然,高性能应该是本系统的主要特色,为了实现本系统高效率的并发处理性能,设计上将采取Reactor模式实现自我快速触发;网络通信上,使用非堵塞I/O进行流畅地数据读写,在应用逻辑上,通过非堵塞的多线程技术实现并发功能处理。特别是J2SE 1.4以后版本推出的非堵塞I/O,将使得Java表现出和C语言同样的优越性能。

1.2.1  Java事件模型

事件只有在被触发时才会发生,它的发生是程序系统无法预料和计划的。例如,火警探测器只有在发生火警时才会触发,但是火警的发生是无法事先预料的,它处于时刻可能发生当中。为了能响应这些无法预料发生的事件,必须建立一套事件处理机制,以预备在发生事件时实现相应的处理。

通常,在一个事件处理框架里有下列3种角色。

·       源目标:事件发生者。

·       监视者:将监察侦听事件的发生,事件发生后,它会被通知到。

·       处理者:事件发生后,它将实现一定的行为动作,也就是处理事件。

Java中有几种模式表达这3种角色之间的关系。

监视模式是一种最常用的模式,GOF《设计模式》中的观察者模式和监视模式类似,这将在以后章节讨论。

在监视模式中,监视者本身也兼任处理者的角色,3种角色被实现为两个独立对象,源目标为一个对象,而监视者和处理者两个角色同位于一个对象中。

例如,一个可视化JavaBean对象Button属于源目标,它通过addActionListener绑定一个监视者对象java.awt.event.ActionListener的子类来完成事件触发机制,当它被用户点按时,ActionListener的子类将完成相应点按事件的处理,如图1-1所示。

nio

1-1  Java GUI中的事件处理

以本系统为例,当用户输入服务器端地址和端口,单击连接按钮后,将启动连接远程服务器线程,其中部分重要方法如下:

connectButton.addActionListener(new ClientFrame_connectButton_actionAdapter(this));

void connectButton_actionPerformed(ActionEvent e) {

        //启动连接服务器线程

        NonBlockingSocket nonBlockingSocket = new NonBlockingSocket(url, port);

        nonBlockingSocket.setDaemon(true);

        nonBlockingSocket.start();

}

连接按钮connectButton通过addActionListener方法加入了一个监视者对象,监视者ClientFrame_connectButton_actionAdapter代码如下:

class ClientFrame_connectButton_actionAdapter

implements java.awt.event.ActionListener {

  ClientFrame adaptee;

  ClientFrame_connectButton_actionAdapter(ClientFrame adaptee) {

    this.adaptee = adaptee;

  }

  public void actionPerformed(ActionEvent e) {

    adaptee.connectButton_actionPerformed(e);

  }

}


上述代码中,通过适配器模式将事件处理委托源目标实现,这就产生了第二种事件处理模式——委托模式。

委托模式使源目标、监视者和处理者3种角色各自实现为独立的3个对象,在这3个对象之间传输的是事件,这个模式在系统复杂时会经常使用,例如在J2EEB/S架构中,前台JSP的表单提交事件后,由MVC模式中的Servlet获得事件数据,经过简单封装成事件对象后,委托给后台EJB层实现进一步处理,如图1-2所示。

1-2中,客户端浏览器将表单事件直接提交到服务器端的JSP/ServletsJSP/Servlets作为事件的监视者,接收到事件后,并不对事件立即进行处理,而是委托给JavaBeans/EJB进行复杂的逻辑或运算处理。

nio

1-2  B/S多层结构委托模式

以上两种模式特点是至少有两个对象分别代表事件的3个角色,而在Reactor模式中,则是在一个对象中绑定了这3种角色,Reactor的意思是自我触发、自主激活的意思。

J2SE 1.4版本中的新特性非堵塞I/ONonblocking I/O)提供了基于Reactor模式的实现,这大大简化了基于Socket的应用和编程,如图1-3所示。

nio

1-3  Reactor模式

在非堵塞I/O API监视器是其一个重要的类Selector被监视的源目标是可以被Selector联系的SelectableChannel基本也属于Selector的相关部分),事件类型有是否有接受的连接OP_ACCEPT、是否可以连接OP_CONNECT、是否可以读取OP_READ和是否可以写入OP_WRITE

监视器Selector主要是监视这些事件,一旦发生,生成SelectionKey对象,Selector是自我触发、自我激活的,因此是典型的Reactor模式实现,其原理将在后面章节详细讨论。

但是,在非堵塞I/O API中,并不是由Selector来实现事件的处理,事件处理是由Selector激活出来后,通过其他处理器Handler来实现处理,开发者使用非堵塞I/O API需要做的工作就是:获取Selector激发的事件,然后根据相应事件类型,编制自己的处理器代码来进行具体处理。例如,如果是可读取事件,那么编制代码从SelectableChannel读取数据包,然后处理这个数据包。

在非堵塞I/O API中,使用Reactor模式将事件发生和事件处理两个部分实现分离解耦,事件发生部分只负责事件的激活,而事件处理由专门的处理器实现具体处理。

1.2.2  架构设计图

 

nio

1-4  架构层次图

考虑到系统的可重用性和伸缩性,需要将本系统的网络通信底层和应用系统分离开。这样,基于可重用的网络通信层,可以实现其他各种实时性较高的应用系统,同时,系统还需要提供一些基本功能支持,如网络连接状态管理以及用户状态相关管理,前者为实现一个动态的实时在线系统提供基本连接的管理,后者类似J2EEServlet部分的Session管理。

本系统在架构设计上将分3个层次,如图1-4所示。

本系统最底层是Socket通信层,将负责客户端和服务器之间快速的数据交换,它通过接口层和最上面应用层实现解耦,同时又通过接口层和应用层保持实时数据联系。用户从客户端进入到本系统前,将实现统一的用户登录验证机制。有关用户登录验证机制有多种实现方式,可以参见后面章节的讨论。

用户成功进入系统以后,将会有一个生存周期,生存周期依据不同底层协议有不同的具体实现。不管哪一种实现方式,都必须在内存中保存用户连接的相关状态,如用户的IP地址、用户最新连接时间等。

为了保证系统的安全性,用户在登陆验证通过后将分配一个随即的SessionID,用户的每次请求都将包含这个SessionID,服务器每次接受请求后,将此SessionID和保存在内存中的数据实现核对。

这里将着重讨论Socket底层以及接口层的设计和实现。Socket底层设计分两大部分:协议设计和连接处理设计;接口层的目的是提供底层和应用层一个中介媒体作用,但是不能设计得太复杂,以免延误数据传送时间。

1.2.3  协议设计

TCP是一种面向连接的协议,传输数据比较可靠。TCP协议中包含了专门的传递保证机制:当接收方收到发送方传来的信息时,会自动向发送方发出确认消息;发送方只有在接收到该确认消息之后才正式传送数据信息,否则将一直等待直到收到确认信息为止。

TCP在正式收发数据前,首先必须建立可靠的连接。一个TCP连接需要经过3次对话才能建立起来,其中的过程非常复杂。

基于TCP有各种会话应用协议,如HTTPFTP等协议。其中,HTTP协议是Internet最常用的协议,其最大的特点是能够穿透各种防火墙,因此,传送数据包以HTTP协议传送是一个实用的选择。

UDP是面向非连接的协议,传送数据之前不需要建立专门的连接,直接发送就可以,因此速度要比TCP快。由于UDP协议并不提供数据传送的保证机制,因此可能发生丢包的情况。UDP适合一次只传送少量数据、对可靠性要求不高的应用环境。

基于以上因素,在本系统中,聊天信息属于一种短小信息,一般情况下可以使用UDP发送,但是为防止数据丢包,在UDP发送失败的情况下可采取TCP再发送一次,而传送的数据采取HTTP协议。这是一个基于TCP/UDP、使用HTTP协议传送数据的混合实现方案。

这种方案带来的最大特点就是通信速度快,服务器和客户端减少了等待连接时间,提高了发送和响应时间。特别对于服务器而言,由于所需TCP链接数量减少,降低了因为建立、维护和撤销TCP链接所带来的服务器负荷,提高了服务器的吞吐量。

本方案模式同样适用于无线通信领域,目前无线通信网络的带宽比较窄,特别是网络质量很不稳定,客户端如果位于J2ME手机端,采取UDP/TCP混合方案可以同时解决带宽窄和网络不稳定两个问题。

本系统的具体实现分服务器和客户端两个方面。客户端将首先采取UDP发送,在UDP发送失败的情况下,采取TCP再进行发送;服务器处理HTTP请求后,产生相应的HTTP响应,响应数据如果无法放进一个UDP数据包中,则要求客户端使用TCP重试。当然,也可以采取其他TCP/UDP选择方案,要求系统中,这种选择策略是可以替换的。


1.2.4  多线程

前面介绍了3Java事件处理模式。在一个微观系统中,事件处理机制的实现主要是依靠Java线程来实现,一般监视者是一个线程,专门用于监测源目标的变化。

 Java 程序中使用多线程要比在 C C++ 中容易得多,因为 Java 编程语言提供了语言级的支持,但是这并非意味着在使用时可以避开线程的一些基本问题。在以后章节中介绍的JSP/Servlet容器,实际是一个线程池容器,JSP在运行时将编译成Servlet,而Servlet是一种线程类,J2EE通过Servlet概念的提出,确保开发者不用担心线程以及同步等问题,可以像往常一样编程。

无论是开发独立多线程的Java Application或使用Servlet,有一个概念总是需要时刻注意:对同一资源访问时需要考虑同步(Synchronization)的问题。但是,同步使用需要慎重,过多使用反而会降低性能,甚至发生死锁(DeadLock),同步只是在复杂的情况下不得已使用的一种办法。在使用同步之前有两个因素需要仔细考虑:首先确定是否一定需要同步;然后确定被访问资源是否属于原子型(atomic)的。下面从这两个方面详细讨论一下。

在一些情况下,多线程访问同一个资料是不需要同步的,如读操作,针对方法体内局部变量的写操作也不需要同步,关键是对类变量的访问操作,一旦设置了类变量,那么就需要非常小心。采取类变量有两种形式,如下:

public class Test{

    private int state;

private volatile long stateLong ;

private byte[] states = null;

    private String stateStrs = null;

    private final Object stateObject = null;

    private HashMap map = new HashMap;

    private Hashtable hashtable = new Hashtable;

 

    public void setState(int state){

         this.state = this.state + state;

    }

    …

}

在上面的Test类中,有5个类变量,分别代表5种不同的类变量:

1state的类型是整型(int),整型是属于Java原始型变量(primitive)。原始变量的操作访问都是原子型(atomic)的(longdouble除外),因此对于原始型变量的操作访问都是线程安全的,不需要实现同步。

2)对longdouble操作访问可以加上volatile,如上面代码第3行。多线程工作中有主内存和工作内存之分,在JVM中有一个主内存,专门负责所有线程共享数据;而每个线程都有它自己私有的工作内存,volatile变量表示保证它必须是与主内存保持一致,它实际是变量的同步。但是,由于volatileJava语言规范中表单不够详细,不是所有的Java虚拟机都支持volatile的。

3)第4行是一个数组state,数组是属于对象(Object),因此,对数组state的访问必须使用Synchronization实现同步。当然,String也属于对象,因此使用时需要注意,在这种情况下还是有可能避免使用Synchronization,而使用Java的对象不变性(immutability)。

不变性对象就是指自从产生那一刻起就无法再改变的对象,一个对象如果有下列2种情况就属于不变性对象,对这些不变性对象的访问就无需使用同步。

4String之类对象。一旦赋值给String,该String对象的长度和内容都不会改变,如果要变化需通过同样性质的类StringBuffer来实现。

5)使用final,这样就阻止对这个类再进行继承拓展的可能,而且可以提高JVM的效率,例如Test类中第6行。

一个类的所有类属性都是通过类的构造方法来设置,没有其他set之类的方法。

类似Stringtrim()toUpperCase()这样的修改后数据结果是在另外一个对象中,Stringtrime()等方法并不是对自己本身对象进行修改,而是将结构保存到另外一个对象中。


因此,在实际编程中,尽量操作方法体内局部变量,这样就不需要考虑同步问题。如果必须做成类变量,那么,想办法使自己的类变量变成一个不变性对象,还是可以避免同步Synchronization)的使用。

使用HashMapHashtable保存对象引用时也需要注意同步的问题。在向HashMap中加入新对象引用时,要使用同步方法;而Hashtable已经实现了内部同步,则在同样操作时不需要加同步,同样,ListVector也是这样的关系。

在必须使用同步的情况下,要注意避免发生死锁。死锁的情况是:A线程试图访问一个资源对象,但这个对象正在被B线程访问,处于锁定状态,暂时无法使用,如果B一直不释放锁定,那么A线程就发生死锁现象。

避免死锁没有完全之策,只有根据自己的应用小心设计,有几种办法对避免死锁有所帮助。

1)通过制造缩小同步范围,尽可能地实现代码块同步。

2)如果使用wait,可以指定毫秒数,让它在一定时间后结束等待,避免死锁。因为wait是被notify()notifyAll()唤醒的,要保证这两个方法确实能够唤醒wait

3)使用性能调试工具可以检测死锁现象发生,如BorlandOptimizeit Profiler

程序系统中一定要避免死锁,线程死锁后会一直占据CPU,这称为Block,这时的CPU使用率100%,严重阻碍了其他线程的运行,降低了系统的性能,容易发生Block还有等待I/O的响应,现在有了非堵塞I/O的帮助,这个问题基本可以避免。

另外一个容易阻塞住CPU的使用就是死循环,使用while(true)这样的语句让线程进入死循环运行,这样的线程会一直占据CPU,解决办法很灵活,如下:

1)使用while (!Thread.interrupted())代替while(true)语句,这样使得线程在执行错误时能够放弃对CPU独霸。

2)在循环体内尽量加上sleep(1000L)这样的语句,这样让CPU有空闲处理其他线程。如果必须做到实时,那么考虑是否有其他应用上的前提限制,使用这些前提条件暂时阻止循环反复执行。

以本系统为例,在客户端需要将用户输入的聊天信息发往服务器,那么建立一个线程一直实现发送功能,由于客户端监视用户输入也有一个监视线程在运行,例如使用Swing实现时,ActionListener的具体实现将监视用户输入。

nio

1-5  队列Queue模式

这样有两个线程各司其职。一个线程负责监视输入,另外一个线程负责将输入发送出去。那么在这两个线程之间如何通信?最经常使用的办法是使用队列(Queue)模式。Queue模式是处理消息通信的基本办法,如图1-5所示。

在图1-5中,一个线程负责不断向Queue中加入新的对象,而另外一个线程则不断地从Queue中读取加入的对象,在Queue中,对象数据排着队等待提取。在Java中,LinkList是队列Queue的最好实现。

在本系统中应用Queue模式就有一个问题,加入动作是由用户输入决定的,一旦有用户输入,就会发生加入动作,这由SwingActionListener负责,那么,提取线程会在队列另外一端进入死循环不断地读取,这样才能在队列中一旦有对象事件时,能够被立即提取出来,因此必须使用while (!Thread.interrupted())实现死循环。

但是必须注意到,每次循环中的提取动作执行是有前提条件的——队列中有对象事件。如果在有对象事件时,通知提取线程,这样可以避免提取线程一直霸占CPU “傻等”,使用线程的wait()notifyAll()可以达到这个目的。

提取线程的循环体内设置wait()进行中断等待,加入对象后,执行notifyAll(),这样提取线程将中断等待,从Queue中读取加入的对象。线程在中断等待时,将释放CPU的霸占,这样就有效率地利用了CPU,如图1-6所示。

nio

1-6  改进后的队列Queue模式

由此可见,并不是说使用了多线程就能提高系统性能,更重要的是还要注意提高CPU使用效率,防止Block发生。

提高多线程的使用效率还必须了解下列几点:

1)线程运行的次序并不是按照创建它们时的顺序来运行的,CPU处理线程的顺序是不确定的。如果需要确定,那么必须手工介入,使用setPriority()方法设置优先级,但是这种方法在Windows NT下有时也不一定有效果。

2)要避免大量线程运行时发生堵塞现象,可以通过设置线程优先级来实现,但是同时又必须注意到,在大量线程被堵塞时,最高优先级的线程先运行,但是不表示低级别线程不会运行,只是运行概率较小而已。

3)使用yield()会自动放弃CPU,有时比sleep更能提升性能。

4)检查所有可能Block的地方,尽可能多地使用sleepyield()以及wait();尽可能延长sleep(毫秒数)的时间;运行的线程不能超过100个;要注意到不同平台LinuxWindows以及不同JVM运行性能差别很大。

1.2.5  线程池

1.2.4节主要介绍了多线程使用时对资源争夺情况的处理,只要掌握几个基本原则,在一般情况下使用多线程实现应用处理就没有太大问题。

但是,当多线程数量很多时,每次启动线程的开销也非常大,有时,创建新线程的服务器在创建和销毁线程上花费的时间和消耗的资源,可能要比花在处理应用逻辑运算的的时间和资源要多得多。

除了创建和销毁线程的开销之外,活动的线程也会消耗系统资源。当应用系统突然遭遇巨大访问量访问时,服务器内存中会创建太多的线程,直至资源完全消耗,这对于应用系统的正常运行是有致命伤害的。

为了能在访问尖峰时限制线程开启数目,以及减少线程频繁创建和销毁带来的系统开销,提高系统的大访问量处理性能和速度,需要事先创建一定数量的线程供调用者循环反复使用,这也就是“池”技术。

线程池的基本原理也是基于队列Queue实现,通过不断查询队列Queue是否有可以运行的线程。如果有,就立即运行线程;如果没有,就锁定等待,直至有新的线程加入被触发解锁。

线程池作为一种比较成熟的技术,一般不需要自己开发设计,因为性能优良的线程池不是很容易开发出来的,必须解决下列几个问题:死锁、资源不足、并发错误、线程泄漏和请求过载等。

因此,可以使用一些现成的经过很多实践证明可靠的线程池软件,其中比较著名的是Doug Lea 编写了一个优秀的开放源码库util.concurrenthttp://gee.cs.oswego. edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html),它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类以及几个工作队列实现。

使用PooledExecutor 类就可以将自己的线程放入线程池中运行。举例如下:

PooledExecutor pool = new PooledExecutor(new BoundedBuffer(20), 100);

pool.setMinimumPoolSize(10); //最小数是10

pool.setKeepAliveTime( -1); //线程一直运行

该语句设置了线程池的最大线程数是100,这样,保护系统不会因为访问量增加导致线程数目无限制增加。使用该线程池如下:

pool .execute(java.lang.Runnable 自己的线程);                                   

这一句实际上是将“自己的线程”加入一个队列中,而队列另外一端正开启着多个线程不断读取这个队列,一旦队列中有空闲的线程,ThreadWorker将读取并分配线程来运行它。execute代码如下:

public void execute(Runnable command) throws InterruptedException {

    for (;;) {                                                         //一直循环 运行

      synchronized(this) {

        if (!shutdown_) {                                  //确保线程池没有被关闭

          int size = poolSize_;                          //当前线程池中线程的数目

          // Ensure minimum number of threads

          if (size < minimumPoolSize_) {        //如果当前线程数目小于线程池最小数目

            addThread(command);                    //增加新线程,并运行command线程

            return;

          }

          //如果目前线程池中有超过或等于最小数目的线程

          //分配一个存在的空闲线程来运行command,handOff是队列

          if (handOff_.offer(command, 0)) {

            return;

          }

          // 如果不能分配已有的线程来运行command,那么创建一个新的线程

          if (size < maximumPoolSize_) {

            addThread(command);

            return;

          }

        }

      }

      // Cannot hand off and cannot create -- ask for help

      if (getBlockedExecutionHandler().blockedAction(command)) {

        return;

      }

    }

  }

}


由以上代码可见,PooledExecutor线程池运行原理是,当执行execute加载一个应用系统的线程池时,线程池内部首先检查当前线程数目有无达到线程池设定的最小数目。如果没有,启动新线程运行;如果已经达到了,那么检查有无空闲可再用的线程来运行;如果没有,就再创建新的线程,直至达到最大线程数目。

因此,使用线程池有两个最大好处:首先是循环使用,一经创建后,空闲的线程可以被再次反复使用,提高了线程运行效率;其次是有最大线程数限制,当系统达到最大数目时,将不再分配线程池中线程来运行,从而保证了系统资源的可用性,防止系统资源不足,甚至耗尽。

在实际应用中,要根据处理内容来决定是否使用线程池,并不是说所有处理功能都可以使用线程池实现。只有那些处理完成速度快,可以在很短时间内结束访问的处理功能才可以使用线程池。J2EE中是采取线程池结合对象池的底层运行机制,通过JSP/Servlet线程访问对象池中的无状态Session Beans来实现复杂耗时功能的处理,这种处理机制已经证明是一种成熟稳定的办法。

本系统中,聊天系统核心功能的处理可以采取线程池设计,这样可以加快对每次聊天信息的处理时间和提高处理速度。


1.2.6  非堵塞I/O

传统网络系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作。过去,当打开一个SocketI/O通道后,使用下列语句:

Socket socket = new Socket(url,port);

InputStream in = socket.getInputStream();

while(!Thread.interrupted()){

int byteRead = in.read();

}

 

其中,read()守候在端口边不断读取传输过来的字节内容,如果读取不到任何字节内容,read()也只能等待,这会使得整个程序系统被锁住,影响程序系统继续做其他事情。一个普遍的改进办法就是开设线程,让线程去等待或轮询。但是,这种做法相当耗费资源的。更主要的是,当线程轮询后,若有事件发生,只有等到线程下次轮询时才会知道。即没有一种这样的途径:当有事件发生时,能够主动发出通知。

在前面讨论中,非堵塞I/O 作为Reactor模式的实现,实际上提供了一种事件发生、自我激活、主动通知的机制。因此使用非堵塞I/O将大大提高系统的I/O处理性能。

非堵塞I/O中有3个重要的类:SelectorSelectionKey Channel

Selector 实际是一个Reactor类,主要监视管理一系列SelectionKey,每个SelectionKey代表一种ChannelSelector之间的关系。在某个Channel上如果发生某种连接事件,Selector将会自动激活产生一个SelectionKey对象。即SelectionKey属于事件对象(Event Object),是动态的,每发生一个连接事件就产生一个SelectionKey对象。

从被激活的SelectionKey中,外界可以知道每个Channel发生的具体事件类型,这些事件包括:是否发生连接、是否可以读或者是否可以写等。

从以上分析可以看出,Selector有自我激活的能力。使用Selector时,只要告诉它需要关注的特定事件,Selector将会一直监视这种特定事件,一旦发生,就发出通知。类似火警警报器,一旦发现失火事件,立即会主动激活报警。

由于Selector只负责事件发生,不负责事件处理,事件处理是由开发者编制程序实现,因此,使用者需要自己建立一套获取发生事件的机制。

总之,非堵塞I/O的使用包括两大部分:注册事件和获取事件。下面将简单演示一下这两部分的使用实现。

首先,需要向Selector注册外界感兴趣的事件,创建Selector对象如下:

Selector selector = Selector.open();                                             

Selector selector = SelectorProvider.provider().openSelector();                       


Selector是一个观察者,那么它观察谁?当然是连接通道Channel,这种Channel是一种SelectableChannel,即可以和Selector发生联系的ChannelSelectableChannel常用的有两种:SocketChannelServerSocketChannel,这两种SelectableChannel的区别是可注册的事件不一样:

·       ServerSocketChannel 对应事件:OP_ACCEPT

·       SocketChannel 对应事件:OP_CONNECTOP_READOP_WRITE

后者一般使用在服务器端,可以从中知道有无可以接受的客户端连接。创建ServerSocketChannel如下:

ServerSocketChannel  sc = ServerSocketChannel.open();

创建一个ServerSocketChannel后,需要将其和主机端口进行绑定,例如和192.168.0.18009端口绑定:

InetSocketAddress address = new InetSocketAddress("192.168.0.1", 8009);

sc.socket().bind(address);

sc.configureBlocking(false);  //设置为非堵塞

现在如果需要从这个ServerSocketChanne了解有无可接受的客户端连接,语法如下:

SelectionKey acceptKey = sc.register( selector, SelectionKey.OP_ACCEPT )

上面一条语句是用Selector注册ServerSocketChannel实例,返回一个Key实例,通常SelectionKey对象都是线程安全型的,但是修改感兴趣的事件操作时,这个方法是被标记为同步的,即在调用interestOps()方法时会锁定一段时间,因此,实际应用中,如果有一个以上的线程来调用同一个Selector对象时,需要使用Selector.wakeup()来解锁。

以上非堵塞I/O的注册事件工作已经准备就绪,那么,在正常运行中,如果Selector发现了事先注册的事件,如何传递出来呢? 这就需要建立一个事件获取通道。

其实,获取事件时,只要执行语句selector.select(),这将触发系统内部自动检查所有使用这个Selector注册的Channel状态。如果在某个Channel发现有感兴趣事件发生,这条语句的返回结果将大于0,通过这个信息外界可以知道有网络连接事件发生了,那么又如何知道是哪个具体Channel发生的呢?

使用selector.selectedKeys()获取一个SelectionKey结果集,遍历这个结果集,通过每个SelectionKey就可以找到发生事件的Channel,这样可以从这个Channel进行读写数据,如图1-7所示。

nio

1-7  非堵塞I/O原理图

1-7基本展示了非堵塞I/O的原理结构这部分结构主要实现了Reactor模式中的事件到达部分当有读或写等任何实现注册的事件发生时可以从Selector中获得相应的SelectionKeySelectionKey可以找到相应的Channel从而能获得客户端发送过来的数据。

1.3  Socket核心设计和实现

通过前面的系统设计,基本解决了本系统实现的主要技术问题,下面将就具体的实现细节展开讨论。

整个系统的核心底层是非堵塞I/O技术,通过使用这一新技术,可以实现底层网络I/O的无堵塞、流畅地读写,为整个系统的高性能运行奠定了坚实的基础。

非堵塞的Socket I/O有两大部分:服务器和客户端。在两端都将采取这一新技术,根据TCP/UDP不同,又分别有两套Socket详细实现。

非堵塞的Socket I/O实现和以往堵塞I/O的实现在编程上有所不同,以前堵塞I/OSocket读写是一种被动行为,即new Socket这些语句可以根据自己系统的应用要求放置在任何位置,可以由程序员自己任意安排的,而非堵塞I/OSocket读写则不一样,它类似一个主动的、有自己“意志”行为的独立线程(因为使用了Reactor模式),所以,什么时候读取数据,什么时候写入数据不是由程序员自己能掌控的,是由Selector决定的。

因此这两种I/O模式的不同使用,决定了不同的编程模式和思维习惯,从堵塞I/O转到非堵塞I/O上,对程序员有些考验。

下面将先讨论服务器的Socket非堵塞I/O实现。

1.3.1  TCPReactor模式

Reactor模式是属于一种自我触发、自我激活的模式非堵塞I/OSelector实现了Reactor模式主要部分将连接事件自我触发SelectionKey事件形式爆发出来。

因此,只要建立一个线程类,反复检查Selector中是否有触发的SelectionKey,如果有,就再次触发对这些事件进行相应的处理,建立一个名为Reactor的线程类。在这个Reactor类中,将使用Selector的事件触发机制,触发本应用系统的事件处理机制。

首先以基于TCP连接的Socket实现为例,在Doug Lea 的《Scalable IO in Java》这本电子文档中,使用Reactor模式很好地实现了事件的自触发机制,如图1-8所示。

nio

1-8  服务器处理连接事件

在图1-8ReactorSelector注册了一个关于TCP的连接事件OP_ACCEPT是否有可接受的连接事件)。当客户端第一次开始连接服务器时OP_ACCEPT事件将激活Reactor将检测到这个激活事件对象SelectionKeySelectionKeyattachment中获取Acceptor线程对象直接运行Acceptor线程。

Acceptor将完成两件事情:

1)向Selector注册了一个新的连接事件OP_READ(是否可以读取数据)。这是假设客户端一旦连接上服务器后,将首先向服务器发送数据,一旦TCP连接握手成功,服务器首先要处于准备读取数据的状态。

2)更改SelectionKey中的attachment,修改为Handler线程对象,这是一个处理读取或写入数据的线程类。

当客户端发送数据到服务器时,可读取事件OP_READ发生了,Reactor又检测到这个事件对象SelectionKey,从SelectionKeyattachment中获取Handler线程对象,立即运行这个线程。

Handler线程从SelectionKey中提取SocketChannel,再从这个Channel中读取数据,然后向Selector注册一个新的连接事件OP_WRITE,以便服务器在处理完成读取的数据后,再写入发送到客户端。

OP_WRITER事件发生时,Handler线程又开始运行,这次是向SocketChannel写入数据,写入完成后,向Selector再注册新的连接事件OP_READ,这样一个请求/响应模式的数据处理基本完成,准备进入下一个循环。

创建Reactor类如下(程序1-1):

程序1-1

public class TCPReactor implements Runnable {

  private final static String module = TCPReactor.class.getName();

  private final Selector selector;               //Selector 实例

  private final ServerSocketChannel sc;       //SeletableCannel一个实现

 

  public TCPReactor (int port) throws IOException {

    selector = Selector.open();                      //创建Selector实例

    sc = ServerSocketChannel.open();         //创建ServerSocketChannel实例

    InetSocketAddress address =

        new InetSocketAddress(InetAddress.getLocalHost(), port);

    sc.socket().bind(address);                      //绑定ServerSocketChannel

    Debug.logVerbose("-->Start host:"+ InetAddress.getLocalHost()+" port=" + port);

    sc.configureBlocking(false);                  //设置为非堵塞

    //selector注册该channel感兴趣的事件为OP_ACCEPT

    SelectionKey sk = sc.register(selector, SelectionKey.OP_ACCEPT);

    //利用skattache功能绑定Acceptor 如果有事件触发Acceptor

    sk.attach(new Acceptor(selector, sc));

    Debug.logVerbose("-->attach(new Acceptor()!");

  }

 

  public void run() {

    try {

      while (!Thread.interrupted()) {  //反复运行,检查是否有触发的key

        selector.select();

        Set selected = selector.selectedKeys();

        Iterator it = selected.iterator();

        //Selector如果发现channel有事件发生,进行key的遍历

        while (it.hasNext())

            //来一个事件 第一次触发一个accepter线程

            //以后触发SocketReadHandler

            dispatch( (SelectionKey) (it.next()));

        selected.clear();

       }

    } catch (IOException ex) {

      Debug.logError("reactor stop!" + ex, module);

    }

  }

   //运行AcceptorSocketReadHandler

  private void dispatch(SelectionKey k) {

    Runnable r = (Runnable) (k.attachment());

    if (r != null) {

         Debug.logVerbose("-->dispatch running");

         r.run();

    }

  }

}

在线程的run()方法中,通过while (!Thread.interrupted())语句不断地对Selector进行事件检查,一旦有事先注册的关注的事件发生,运行dispatch(SelectionKey k)方法进行分配处理,在dispatch方法中,从SelectionKeyattachment中获得的一个线程,然后启动这个线程,这样,获取发生事件后,同时也驱动了对事件的进一步处理。

那么,这个线程是如何被赋予SelectionKey呢?原来在前面有一句:

sk.attach(new Acceptor(selector, sc));

SlectionKey有两种处理附件attachment的方法:

public abstract class SelectionKey

{

 …

public final Object attach (Object ob)   //类似setAttachment

public final Object attachment( )       //类似getAttachment

}

首先通过attach将一个对象和SelectionKey发生联系,然后再通过attachment( )获得这个对象,这个对象可以是任何业务对象、处理器或另外一个Channelattach只是保存对象的引用,在使用完成这个功能后,要使用attach(null)来清除附件对象的引用,以便垃圾回收机制能够回收这个附件对象。

SelectionKey注册自己的特定对象用如下语句:

SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);

等同于下列语句:

SelectionKey key = channel.register (selector, SelectionKey.OP_READ);

key.attach (myObject);

本系统中是采取后者做法:

sk.attach(new Acceptor(selector, sc));

那么被attach的线程对象Accpetor是对事件实行进一步处理的,注意一下事先注册的事件是SelectionKey.OP_ACCEPT,即系统运行开始时,第一个关注的事件总是OP_ACCEPT:是否有可接受的网络连接,这是服务器运行后一直应该关注的头等事件。

如果有这样的事件发生,那么就会激活Accpetor线程对象,从而启动一个Acceptor线程,在这个线程中,将准备下一步工作,就是再向Selector注册其他事件,例如这个连接是否可以读出或是否可以写入数据等。代码如下(程序1-2):

程序1-2

public class Acceptor implements Runnable {

  private final Selector selector;

  private final ServerSocketChannel ssc;

  public Acceptor(Selector selector, ServerSocketChannel ssc) {

    this.selector = selector;

    this.ssc = ssc;

  }

 

  public void run() {

    try {

      Debug.logVerbose("-->ready for accept!");

      SocketChannel sc = ssc.accept();

      if (sc != null) {

        sc.configureBlocking(false);                             //设定为非堵塞

        SelectionKey sk = sc.register(selector, 0);        //注册这个SocketChannel

 

        //同时将SelectionKey标记为可读,以便读取

        sk.interestOps(SelectionKey.OP_READ);

        selector.wakeup();                        //因为interestOps,防止Selector死锁

        sk.attach(new Handler(sk, sc));                        //携带Handler对象

      }

    } catch (Exception ex) {

      Debug.logVerbose("accept stop!" + ex);

    }

  }

}

Accpetor中,从ServerSocketChannel获得SocketChannel实例,这两个Channel可注册的事件是不一样的,后者可以注册是否可读或可写等事件。Accpetor代码中注册了是否可以读SelectionKey.OP_READ的事件,然后attachHandler线程对象。

这样,Selector将一直关注OP_READ事件,一旦有这类事件发生,将激活attachmentHandler线程的运行。Handler在可读事件发生后启动,就是从SocketChannel中读取客户端传送的数据了,Handler代码如下(程序1-3):

程序1-3

public class TCPHandler implements Runnable {

  private final static String module = TCPHandler.class.getName();

  private final SocketChannel sc;

  private final SelectionKey sk;

  private SocketHelper socketHelper;                  //Socket读写帮助类

 

  public TCPHandler (SelectionKey  sk, SocketChannel sc) throws IOException {

    this.sc = sc;

    this.sk = sk;

    socketHelper = new SocketHelper();

    Debug.logVerbose(" SocketReadHandler prepare ...", module);

  }

 

  public void run() {                                             //线程run方法

    Debug.logVerbose("Handler running ...", module);

    try {

      if (state == READING) read();                  //读取数据

      else if (state == SENDING) send();             //写入数据

    } catch (Exception ex) {

      Debug.logError("readRequest error:" + ex, module);

      socketHelper.close(sc);

      sk.cancel();

    }

  }

  //SocketChannel中读取数据

  private void read() throws Exception{

    try {

      //Socket中读取byte[]数组

    byte[] bytes = socketHelper.readSocket(sc);

      if (bytes.length == 0) throw new Exception();

    //实现服务器聊天核心处理功能,这里暂时打印出来,方便测试

    System.out.println(" ge result is :" + new String(bytes));

 

    state=SENDING;

      sk.interestOps(SelectionKey.OP_WRITE); //注册新的事件

  } catch (Exception ex) {

      throw new Exception(ex);

  }

  }

  //SocketChannel写入数据

  private void send()throws Exception{

    try {

      //写入测试数据

      String request1 = "come back";

      System.out.println(" send result is :" + request1);

    socketHelper.writeSocket(request1.getBytes(),sc);

    state=READING;

    sk.interestOps(SelectionKey.OP_READ);

  } catch (Exception ex) {

       throw new Exception(ex);

  }

  }

}

Handlerread方法中,简单地从SocketChannel中读取Message一个实例,然后打印出来,下一步可以在这里启动新的线程,进行聊天具体处理。如获取这个聊天信息的接受方用户ID,然后以用户ID寻找出它的SocketChannel,从而向对方用户发出该信息。

为了实现一个分布式的服务器环境,可以使用JMS这样的消息处理系统,通过查询该服务器内的用户名单,如果对方用户ID不是登录本服务器,那么通过JMS将消息发送给它。