MINA学习

简介: 工作中遇到了MINA,以前没接触过,所以就想搞搞明白这东西干嘛的,怎么玩起来的。最近花了几天时间去学习,这里做个小结以整理知识加深理解。     Apache MINA是一个网络应用框架框架,用来帮助用户简单地开发高性能和高可靠性的网络应用程序。

工作中遇到了MINA,以前没接触过,所以就想搞搞明白这东西干嘛的,怎么玩起来的。最近花了几天时间去学习,这里做个小结以整理知识加深理解。 
    Apache MINA是一个网络应用框架框架,用来帮助用户简单地开发高性能和高可靠性的网络应用程序。它提供了一个通过Java NIO在不同的传输例如TCP/IP和UDP/IP上抽象的事件驱动的异步API。(至少知道是搞网络的框架了~~~) 
    直接看个例子吧!!! 
    实现一个简单的计算器服务,客户端发送要计算的表达式给服务器,服务器返回计算结果。比如客户端发送2+2,服务器返回4。 
    (怎么就直接上代码了,还糊涂着呢!!!~~~哈哈哈  Code Monkey总是如此的直接~~) 
CalculatorHandler.java 

 1 public class CalculatorHandler extends IoHandlerAdapter {  
 2     private ScriptEngine jsEngine = null;  
 3   
 4     public CalculatorHandler() {  
 5         ScriptEngineManager sem = new ScriptEngineManager();  
 6         jsEngine = sem.getEngineByName("JavaScript");  
 7         if (jsEngine == null) {  
 8             throw new RuntimeException("Can't find JavaScript Engine");  
 9         }  
10     }  
11   
12     @Override  
13     public void messageReceived(IoSession session, Object message)  
14             throws Exception {  
15         String expression = message.toString();  
16         if ("quit".equalsIgnoreCase(expression.trim())) {  
17             session.close(true);  
18             return;  
19         }  
20         try {  
21             Object result = jsEngine.eval(expression);  
22             session.write("=" + result.toString());  
23         } catch (Exception e) {  
24             session.write("Wrong expression,try again.");  
25         }  
26   
27     }  
28 }  

CalculatorServer.java 

public class CalculatorServer {  
       private static final int PORT = 10010;  
  
       public static void main(String[] args) throws IOException {  
            IoAcceptor acceptor = new NioSocketAcceptor();  
            acceptor.getFilterChain().addLast( "logger" , new LoggingFilter());  
            acceptor.getFilterChain().addLast(  
                         "codec" ,  
                         new ProtocolCodecFilter( new TextLineCodecFactory(Charset  
                                    . forName( "UTF-8"))));  
            acceptor.setHandler( new CalculatorHandler());  
            acceptor.bind( new InetSocketAddress( PORT));  
      }  
}  

运行CalculatorServer并使用telnet登录进行测试 

它怎么就跑起来了呢,怎么这么神奇呢!? 
    据观察mian方法中实例化了IoAcceptor,向它的“过滤链”末尾添加了两个Filter,然后设置了Handler(对,就是上面的CalculatorHandler),然后绑定到一个地址。 
    看名字两个Filter分别和日志,编解码相关,都是MINA框架自带了,就先不看了。绑定地址也没什么好说的,重点肯定在这个CalculatorHandler里面了。 
    CalculatorHandler做了什么?继承IoHandlerAdapter并重写了messageReceived方法。messageReceived顾名思义就是接收到消息后执行这个方法。在这个方法内调用jsEngine.eval(expression)对接收到的表达式进行计算并将结果写回到session中。 

    根据上面的分析,大致可以推断这样的结论(推断~~推断~~其实就是“你猜啊!”): 
    MINA中靠session去传递消息。IoAcceptor是核心,它绑定在一个地址上(有点像监听器),并有一个Filter链和Handler。它是事件驱动的,发生某些事件时触发指定的方法。大致的执行过程是监听地址上发生某些事件,产生session并进过Filter链的处理最终进入Handler执行业务逻辑的处理。。。 


    程序也跑起来了,怎么跑的也去“猜”了,是时候了一探究竟了。。。一探究竟?Code Monkey竟然会用成语!!!啊哈哈哈哈 
http://mina.apache.org/ 官网就在这里,你看,或者不看,它还在那里。源码就在上面,你下,或者不下,随你吧。。。 
    Apache MINA is a network application framework which helps users develop high performance and high scalability network applications easily. It provides an abstract ·event-driven · asynchronous API over various transports such as TCP/IP and UDP/IP via Java NIO. 

    Apache MINA is often called: 

    NIO framework · library, 
    client · server framework · library, or 
    a networking · socket library. 

    大致看一样这框架在系统中的位置吧 

    MINA处于应用程序和底层网络之间,帮助用户更简单的开发网络应用。 
    MINA框架的分层及组件(搞过WEB开发的一看肯定就明白了。。。) 

    基础MINA应用被分为三层: 
    I/O Service:执行真正的I/O 
    I/O Filter Chain:将字节过滤/转换成设计的数据结构(对象) 
    I/O Handler:业务逻辑 
    创建MINA应用必须: 
    创建I/O Service:选择已存在的或自己去创建 
    创建Filter Chain:可以使用MINA提供的Filter也可以自己实现 
    创建I/O Handler:写自己的逻辑 

既然主要的组件有这么几个,一个个看吧。 

  IoService的职责很清晰了就不说了,方法说说明看源码吧。AbstractIoService是IoService的一个抽象实现类,增加了一些默认的实现(这个类里实现而来设置Handler)。我们自己写代码的时候也有很多这样的设计,接口+一个抽象的实现类,带来的好处就不说了。 
    IoService有两个重要的子接口:IoAcceptor和IoConnector。MINA应用的服务端必须实现IoAcceptor,客户端必须实现IoConnector(当然,MINA提供了一些实现类)。 
    IoAcceptor类图 

 IoAcceptor几个实现类: 
    NioSocketAcceptor: the non-blocking Socket transport IoAcceptor 
    NioDatagramAcceptor: the non-blocking UDP transport IoAcceptor 
    AprSocketAcceptor: the blocking Socket transport IoAcceptor, based on ARP 
    VmPipeSocketAcceptor: the in-VM IoAcceptor 
    IoConnector类图 

  IoConnector几个实现类: 
    NioSocketConnector: the non-blocking Socket transport IoConnector 
    NioDatagramConnector: the non-blocking UDP transport IoConnector 
    AprSocketConnector: the blocking Socket transport IoConnector, based on ARP 
    ProxyConnector: a IoConnector providing proxy support 
    SerialConnector: a IoConnector for serial transport 
    VmPipeConnector:the in-VM IoConnector 
    根据自己的需求选择合适的IoAcceptor和IoConnector。 

    IoSession 
    Session是MINA的核心:每次客户端连接到服务器端都会创建一个新的session,并保持在内存中,直到连接断开。 
    Session的状态: 
    Connected: the session has been created and is avaiable 
    Idle: the session hasn't processed any request for at least a period of time(the period is configurable) 
        Idle for read: no read has actually been made for a period of time 
        Idle for write: no write has actually been made for a period of time 
        Idle for both: no read nor write for a period of time 
    Closing: the session is being closing (the remaining messages are being flushed,cleaning up is terminated) 
    Closed: the session is now closed, nothing else can be done to revive it 
    Session状态图 

 Filters 
    Filters处在IoService和IoHandler之间,过滤所有的I/O事件和请求。 
    一些“开箱即用”的过滤器: 
    LoggingFilter logs all events and requests. 
    LoggingFilter日志记录所有事件和请求。 
    PtotocolCodecFilter converts an incoming ByteBuffer into message POJO and vice versa. 
    PtotocolCodecFilter将传入的消息转换成POJO ByteBuffer,反之亦然。 
    CompressionFilter compresses all data. 
    CompressionFilter压缩所有数据。 
    SSLFilter adds SSL - TLS - StartTLS support. 
    SSLFilter添加SSL - TLS - StartTLS支持。 
    ... 
    
    Handler 
    IoHandler接口定义了如下7个方法: 
    sessionCreated 
    当一个新连接被建立,从一个I/O处理器调用该方法。应为该方法在同一个线程中被调用,所以该方法应尽量处理消耗时间较小的任务。 
    sessionOpened 
    当连接被打开后调用。该方法在sessionCreated之后调用。最大的不同是从另一个线程被调用。 
    sessionClosed 
    当连接被关闭后调用。 
    sessionIdle 
    session被闲置的时候调用。 
    exceptionCaught 
    当任何异常被抛出时调用。当异常是IOException时,MINA将自动关闭连接connection。 
    messageReceived 
    当接收到消息的时候被调用 
    messageSent 
    当发送消息时被调用 

    了解以上内容后去理解开头的例子就很容易了。 
    当我们使用MINA开始定义自己的协议进行网络通信的时候就需要自己实现编解码器了,这时有必要先了解一下IoBuffer这个类。 
    IoBuffer是MINA框架使用的byte buffer。 
    IoBuffer是ByteBuffer的替代品。为什么不适用ByteBuffer?直接原因有一下两点: 
    1.ByteBuffer不提供像get/putString,get/putAsciiInt这样方便的getters和putters方法。 
    2.ByteBuffer的固定容量使记录变长数据非常的不方便。 
    注意:IoBuffer在MINA3中可能被放弃,因为仅仅为了扩充容量而拓展现有的buffer。MINA3可能使用InputStream去替代byte buffer或其他方案。 
    
    IoBuffer操作 
    分配一个新的Buffer 
    public static IoBuffer allocate(int capacity) 
    public static IoBuffer allocate(int capacity, boolean direct) 
    capacity:Buffer的容量 
    direct:是否直接缓冲。true to get direct buffer,false to get heap buffer 
    也可以通过以下方式获取Buffer 
    // 默认使用heap buffer 
    IoBuffer.setUseDirectBuffer (true ); 
    IoBuffer buffer = IoBuffer.allocate(1024); 

    创建自动扩展的Buffer 
    public abstract IoBuffer setAutoExpand( boolean autoExpand); 
    创建自动收缩的Buffer 
    public abstract IoBuffer setAutoShrink( boolean autoShrink); 

------------------------------------------------------------------------------华丽丽的分割线------------------------------------------------------------------------------



    是骡子是马拉出来溜溜,是时候自己动手写点东西(搞两个WEB项目吧,一个Server一个Client,Client向Server发送消息并获取返回内容显示在页面上)。 
    回顾一下,建立MINA应用的步骤吧。实现IoService,实现自己的编解码器,实现IoHandler,加入自己的业务逻辑...... 
    从Client入手吧。接收页面内容的Action什么的就不说了,直接从Service(这个Service指Action、Service、DAO中的Service)说起吧。 

@Service("remoteService")  
public class RemoteServiceImpl implements RemoteService {  
    private static final Log log = LogFactory.getLog(RemoteServiceImpl.class);  
    @Autowired  
    private IoConnector ioConnector;  
  
    @Override  
    public ServerResponse executeRequest(ServerRequest request) {  
        IoSession session = null;  
        try {  
            ioConnector.getSessionConfig().setUseReadOperation(true);  
            session = ioConnector.connect().awaitUninterruptibly().getSession();  
            session.write(request);  
            ReadFuture rf = session.read().awaitUninterruptibly();  
            Object message = rf.getMessage();  
            if (message != null) {  
                return (ServerResponse) message;  
            }  
        } catch (Exception e) {  
            // TODO: handle exception  
            log.error("RemoteService execute request error.", e);  
            e.printStackTrace();  
        } finally {  
            if (session != null) {  
                session.close(true);  
            }  
        }  
        return null;  
    }  
  
}  

    没什么内容自动注入这些Spring相关的不看了,下面会给出配置。 
    上面的方法使用的ServerRequest、ServerResponse是两个简单的POJO。ServerRequest有一个message属性代表要发送的消息,ServerResponse有一个responseStr属性代表返回的字符串。 
    一些<bean>的配置

<!-- 属性编辑器 -->  
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">  
    <property name="customEditors">  
        <map>  
            <entry key="java.net.SocketAddress">  
                <bean class="org.apache.mina.integration.beans.InetSocketAddressEditor" />  
            </entry>  
        </map>  
    </property>  
</bean>  
  
<bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter"  
    destroy-method="destroy" />  
  
<bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">  
    <constructor-arg>  
        <bean class="包名.ServerProtocolCodecFactory">  
            <constructor-arg name="encoder">  
                <bean class="包名.ServerRequestEncoder" />  
            </constructor-arg>  
            <constructor-arg name="decoder">  
                <bean class="包名.ServerResponseDecoder" />  
            </constructor-arg>  
        </bean>  
    </constructor-arg>  
</bean>  
  
<bean id="filterChainBuilder"  
    class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">  
    <property name="filters">  
        <map>  
            <entry key="codecFilter" value-ref="codecFilter" />  
        </map>  
    </property>  
</bean>  
  
<bean id="clientHandler" class="包名.ClientHandler" />  
  
<bean id="ioConnector" class="org.apache.mina.transport.socket.nio.NioSocketConnector"  
    destroy-method="destroy">  
    <!-- “属性编辑器” 一些属性在XML 中写为String 类型,但实际JAVA 类型中要 求注入的是一个其他的对象类型,你需要对此做出转换。譬如:下面的defaultRemoteAddress   
        我们传入的是一个字符串,但实际上NioSocketConnector 中需要的是一个InetSocketAddress,这里就需要一个编辑器将XML   
        中注入的字符串构造为 InetSocketAddress 对象。在Mina 自带的包org.apache.mina.integration.beans   
        中提供 了很多的属性编辑器,如果你发现某个属性的编辑器没有提供,可以自行编写 InetSocketAddress -->  
    <property name="defaultRemoteAddress" value="${client.host}:${client.port}" />  
    <property name="filterChainBuilder" ref="filterChainBuilder" />  
    <property name="handler" ref="clientHandler" />  
</bean>  

    配置信息中“包名.XXX”(“包名”中包含这公司信息就隐去了)的类都是自己实现的,一共两块内容,Handler和编解码器CodecFilter。 
    CodecFilter负责完成ServerRequest、ServerResponse和byte buffer之间的转化。 
    代码就放这里吧(ServerProtocolCodecFactory ServerRequestEncoder ServerResponseDecoder)。 

public class ServerProtocolCodecFactory implements ProtocolCodecFactory {  
  
    private static final long serialVersionUID = 3338741166925608943L;  
    private ProtocolEncoder encoder;  
    private ProtocolDecoder decoder;  
  
    public ServerProtocolCodecFactory(ProtocolEncoder encoder,  
            ProtocolDecoder decoder) {  
        super();  
        this.encoder = encoder;  
        this.decoder = decoder;  
    }  
  
    @Override  
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
        return decoder;  
    }  
  
    @Override  
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
        return encoder;  
    }  
}  
  
  
public class ServerRequestEncoder implements ProtocolEncoder {  
    private final static Log log = LogFactory  
            .getLog(ServerRequestEncoder.class);  
    private final static Charset charset = Charset.forName("UTF-8");  
  
    @Override  
    public void encode(IoSession session, Object message,  
            ProtocolEncoderOutput out) throws Exception {  
        // 将所需传输数据编码转化放入IoBuffer,将得到的IoBuffer写入ProtocolEncoderOutput中  
        log.info("####################字符编码####################");  
        if (!(message instanceof ServerRequest)) {  
            throw new IllegalArgumentException("get unknow type:"  
                    + message.getClass().getName()  
                    + ",should be [ServerRequest].");  
        }  
        ServerRequest request = (ServerRequest) message;  
        IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);  
        buff.putString(request.getMessage(), charset.newEncoder());  
        buff.putString(LineDelimiter.DEFAULT.getValue(), charset.newEncoder());  
        buff.flip();  
        out.write(buff);  
    }  
  
    @Override  
    public void dispose(IoSession session) throws Exception {  
        log.info("####################Dispose####################");  
    }  
  
}  
  
  
public class ServerResponseDecoder implements ProtocolDecoder {  
    private final static Log log = LogFactory  
            .getLog(ServerResponseDecoder.class);  
    private final static Charset charset = Charset.forName("UTF-8");  
    private IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);  
  
    @Override  
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
            throws Exception {  
        log.info("###############decode############");  
        while (in.hasRemaining()) {  
            byte b = in.get();  
            if (b == '\n') {  
                buff.flip();  
                byte[] bytes = new byte[buff.limit()];  
                buff.get(bytes);  
                String message = new String(bytes, charset);  
                ServerResponse response = new ServerResponse();  
                response.setResponseStr(message);  
                out.write(response);  
            } else {  
                buff.put(b);  
            }  
        }  
    }  
  
    @Override  
    public void finishDecode(IoSession session, ProtocolDecoderOutput out)  
            throws Exception {  
        log.info("################完成解码###################");  
    }  
  
    @Override  
    public void dispose(IoSession session) throws Exception {  
        log.info("#################Dispose##################");  
    }  
  
} 

    接着看Handler(其实一点内容都没有,继承了IoHandlerAdapter但都没里面的方法还都是调父类的,只是留着先,后期可以加入自己的逻辑) 

public class ClientHandler extends IoHandlerAdapter {  
    private static final Log log = LogFactory.getLog(ClientHandler.class);  
  
    @Override  
    public void exceptionCaught(IoSession session, Throwable cause)  
            throws Exception {  
        super.exceptionCaught(session, cause);  
        log.error("Client handler caught an error", cause);  
    }  
  
    @Override  
    public void messageReceived(IoSession session, Object message)  
            throws Exception {  
        super.messageReceived(session, message);  
    }  
  
    @Override  
    public void messageSent(IoSession session, Object message) throws Exception {  
        log.debug("ClientHandler.messageSent"+message);  
        super.messageSent(session, message);  
    }  
} 

    这样客户端就完成了,当然,你要加发送消息的页面,显示返回结果的页面。 

    服务端和客户端没什么差别,无非Client用的IoConnector改为IoAcceptor......要注意的是,Server的编解码器。解码器Decoder能将接收到的消息转成正确的对象,编码器Encoder编码结果能被客户端的解码器解码(就是这样一个对应关系ServerDecoder-ClientEncoder,ServerEncoder-ClientDecoder,编码成什么样就可以自己约定了)。 

(这么烂的图都敢上~~~) 

------------------------------------------------------------------------------华丽丽的分割线又来了------------------------------------------------------------------------


    总不能是发这么个字符串过去回来个字符串吧,实际项目中没这么简单的需求吧???比如一个关键字和其他参数过去回来一堆数据一个List什么的还差不多。 
    要实现上面的需求,关键的就在于协议的定义了。发送过去的关键字和参数是怎么样的返回回来的数据结构是怎么要的,怎么进行编解码等等了。搞起搞起~~~~~~ 

    这块搞得时间比较长,但还是觉得写得不大好。不过至少也是周末时间努力的结果,大家看看吧。不多说,上代码。 
    Client端的编解码及协议。 
    Client编码在ServerRequest自身中实现,ServerRequestEncoder仅仅调用ServerRequest的方法获取结果并写入到ProtocolEncoderOutput中。 
    ServerProtocol中主要定义了一些协议中使用到的常量。 

    public static final Charset charset = Charset.forName("UTF-8");  
    public static final int protocolHeadLength = 4;  
    public static final String equalString = "=";  
    public static final char equalChar = '=';  
    public static final String separateString = "\5";  
    public static final char separateChar = '\5';  
    public static final String endString = "\7";  
    public static final char endChar = '\7';  

ServerRequest.java 

public class ServerRequest extends ServerProtocol {  
  
    private static final long serialVersionUID = 7447009933974403515L;  
    private static final Log log = LogFactory.getLog(ServerRequest.class);  
  
    private IoBuffer parameters;  
  
    public ServerRequest() {  
        super();  
        // direct设置为false获取heap buffer. heap buffer效率更高  
        this.parameters = IoBuffer.allocate(256, false).setAutoExpand(true);  
    }  
  
    public ServerRequest(Map<String, Object> paramaters) {  
        this();  
        this.setParameters(paramaters);  
    }  
  
    public void setParameters(Map<String, Object> parameters) {  
        if (parameters == null) {  
            return;  
        }  
        for (Entry<String, Object> entry : parameters.entrySet()) {  
            addParameter(entry.getKey(), entry.getValue());  
        }  
    }  
  
    public void addParameter(String key, Object value) {  
        if (key == null) {  
            return;  
        }  
        CharsetEncoder encoder = charset.newEncoder();  
        try {  
            this.parameters.putString(key, encoder);  
            this.parameters.putString(equalString, encoder);  
            if (value == null) {  
                this.parameters.putString("", encoder);  
            } else {  
                this.parameters.putString(value.toString(), encoder);  
            }  
            this.parameters.putString(separateString, encoder);  
        } catch (CharacterCodingException e) {  
            log.error("ServerRequest CharacterCodingException...", e);  
        }  
    }  
  
    public IoBuffer getProtocolBuffer() {  
        int dataLength = this.parameters.position();  
        // 构造传输的IoBuffer  
        IoBuffer transferBuffer = IoBuffer.allocate(protocolHeadLength  
                + dataLength, true);  
        // 向协议头中写入数据长度  
        transferBuffer.putInt(dataLength);  
        // 写入数据  
        this.parameters.flip();  
        transferBuffer.put(this.parameters);  
        transferBuffer.flip();  
        return transferBuffer;  
    }  
}  

     解码在ServerResponse中实现,ServerResponseDecoder仅调用ServerResponse的方法获取解码结果写入ProtocolDecoderOutput中。 
ServerDecoder.java 

public class ServerResponse extends ServerProtocol {  
  
    private static final long serialVersionUID = -2728053247051653694L;  
    private static final String FALSE = "F";  
    private static final String nextMark = "hasNext";  
    private static final String errorNoFlag = "errorNo";  
    private static final String errorInfoFlag = "errorInfo";  
    private String errorNo;  
    private String errorInfo;  
    private ByteBuffer headBuffer = ByteBuffer.allocate(protocolHeadLength);  
    private List<ServerResponseEntry> list = new ArrayList<ServerResponseEntry>();  
  
    public void parseBuffer(IoBuffer ioBuffer) {  
        if (ioBuffer == null) {  
            return;  
        }  
        for (int i = 0; i < protocolHeadLength && ioBuffer.hasRemaining(); i++) {  
            headBuffer.put(ioBuffer.get());  
        }  
        headBuffer.flip();  
        int dataLength = headBuffer.getInt();  
        // 写data到buffer  
        ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength);  
        while (ioBuffer.hasRemaining() && dataBuffer.position() < dataLength) {  
            dataBuffer.put(ioBuffer.get());  
        }  
        dataBuffer.flip();  
        CharBuffer dataCharBuffer = charset.decode(dataBuffer);  
        boolean writeKey = true;  
        StringBuilder keyBuilder = new StringBuilder();  
        StringBuilder valueBuilder = new StringBuilder();  
        ServerResponseEntry entry = new ServerResponseEntry();  
        // 这段解析写的太烂了~~~~~~~~~  
        iteratingDataCharBuffer: while (dataCharBuffer.hasRemaining()) {  
            char character = dataCharBuffer.get();  
            switch (character) {  
            // 遇到'='  
            case equalChar:  
                writeKey = false;  
                break;  
            // 遇到'\5' 分隔符,将Key和Value写入Map  
            case separateChar:  
                writeKey = true;  
                String key = keyBuilder.toString();  
                String value = valueBuilder.toString();  
                if (key.equals(nextMark)) {  
                    // 遇到“下一条”标识,将记录加入到list中  
                    ServerResponseEntry item = new ServerResponseEntry();  
                    item.getRow().putAll(entry.getRow());  
                    list.add(item);  
                    entry.getRow().clear();  
                    if (value.equals(FALSE)) {  
                        // 最后一条记录,结束parseBuffer  
                        dataCharBuffer.mark();  
                        break iteratingDataCharBuffer;  
                    }  
                } else {  
                    System.out.println(key + "=" + value);  
                    entry.addKeyValue(key, value);  
                }  
                keyBuilder = new StringBuilder();  
                valueBuilder = new StringBuilder();  
                break;  
            default:  
                // 拼接KEY和VALUE  
                if (writeKey) {  
                    keyBuilder.append(character);  
                } else {  
                    valueBuilder.append(character);  
                }  
                break;  
            }  
        }  
        dataCharBuffer.reset();  
        keyBuilder = new StringBuilder();  
        valueBuilder = new StringBuilder();  
        // 构造错误代码和错误信息  
        Map<String, String> error = new HashMap<String, String>();  
        while (dataCharBuffer.hasRemaining()) {  
            char character = dataCharBuffer.get();  
            switch (character) {  
            // 遇到'='  
            case equalChar:  
                writeKey = false;  
                break;  
            case separateChar:  
                writeKey = true;  
                error.put(keyBuilder.toString(), valueBuilder.toString());  
                keyBuilder = new StringBuilder();  
                valueBuilder = new StringBuilder();  
                break;  
            default:  
                // 拼接KEY和VALUE  
                if (writeKey) {  
                    keyBuilder.append(character);  
                } else {  
                    valueBuilder.append(character);  
                }  
                break;  
            }  
        }  
        errorNo = error.get(errorNoFlag);  
        errorInfo = error.get(errorInfoFlag);  
    }  
  
    public List<ServerResponseEntry> getList() {  
        return list;  
    }  
  
    public String getErrorNo() {  
        return errorNo;  
    }  
  
    public String getErrorInfo() {  
        return errorInfo;  
    }  
}  

     注意哦,解码是针对服务端的编码结果的!!! 


    Server相关的主要三个类分别是ProtocolDTO,ProtocolReadDTO,ProtocolWriteDTO,分别是定义协议常量公共方法,解析接收到的内容,构造返回结果。 
    文字能力不写啊,还是直接给代码吧! 
ProtocolDTO.java 

public abstract class ProtocolDTO implements Serializable {  
  
    private static final long serialVersionUID = -6932714193726622067L;  
    public static final String keyWorld = "KEY";  
    public static final int headDataLength = 4;  
    public static final Charset charset = Charset.forName("UTF-8");  
    public static final char equalChar = '=';  
    public static final String equalString = "=";  
    public static final char separateChar = '\5';  
    public static final String separateString = "\5";  
    public static final char endChar = '\7';  
    public static final String endString = "\7";  
  
    @Override  
    public String toString() {  
        return ToStringBuilder.reflectionToString(this);  
    }  
  
    @Override  
    public boolean equals(Object obj) {  
        return EqualsBuilder.reflectionEquals(this, obj);  
    }  
  
    @Override  
    public int hashCode() {  
        return HashCodeBuilder.reflectionHashCode(this);  
    }  

ProtocolReadDTO.java 

public class ProtocolReadDTO extends ProtocolDTO {  
  
    private static final long serialVersionUID = 7447009933974403515L;  
    private ByteBuffer headBuffer = ByteBuffer.allocate(headDataLength);  
    private String key = null;  
    private Map<String, Object> data;  
  
    public void parseBuffer(IoBuffer ioBuffer) {  
        if (ioBuffer == null) {  
            return;  
        }  
        StringBuilder keyBuilder = new StringBuilder();  
        StringBuilder valueBuilder = new StringBuilder();  
        data = new HashMap<String, Object>();  
        // IoBuffer转ByteBuffer  
        for (int i = 0; i < headDataLength && ioBuffer.hasRemaining(); i++) {  
            headBuffer.put(ioBuffer.get());  
        }  
        headBuffer.flip();  
        int dataLength = headBuffer.getInt();  
        // 构造数据缓冲  
        ByteBuffer dataBuffer = ByteBuffer.allocate(dataLength);  
        while (ioBuffer.hasRemaining() && dataBuffer.position() < dataLength) {  
            dataBuffer.put(ioBuffer.get());  
        }  
        dataBuffer.flip();  
        // dataBuffer从ByteBuffer转成CharBuffer  
        CharBuffer dataCharBuffer = charset.decode(dataBuffer);  
        boolean writeKey = true;  
        while (dataCharBuffer.hasRemaining()) {  
            char character = dataCharBuffer.get();  
            switch (character) {  
            case equalChar:  
                writeKey = false;  
                break;  
            case separateChar:  
                // 遇到分隔符了,向map中写入key和value  
                if (keyBuilder.toString().equals(keyWorld)) {  
                    this.key = valueBuilder.toString();  
                } else {  
                    data.put(keyBuilder.toString(), valueBuilder.toString());  
                }  
                writeKey = true;  
                keyBuilder = new StringBuilder();  
                valueBuilder = new StringBuilder();  
                break;  
            case endChar:  
  
                break;  
            // 默认操作则将char加入到key和value字符串中  
            default:  
                if (writeKey) {  
                    keyBuilder.append(character);  
                } else {  
                    valueBuilder.append(character);  
                }  
                break;  
            }  
        }  
    }  
  
    public Map<String, Object> getData() {  
        return data;  
    }  
  
    public String getKey() {  
        return key;  
    }  
}  

ProtocolWriteDTO.java 

public class ProtocolWriteDTO extends ProtocolDTO {  
  
    private static final long serialVersionUID = -2728053247051653694L;  
  
    private static final String TRUE = "T";  
    private static final String FALSE = "F";  
    private static final String errorNoFlag = "errorNo";  
    private static final String errorInfoFlag = "errorInfo";  
    private String errorNo = "0";  
    private String errorInfo = "";  
    /** 标识,还有下一条记录 */  
    private static final String hasNext = "hasNext" + equalChar + TRUE;  
    /** 标识,没有下一条记录 */  
    private static final String noNext = "hasNext" + equalChar + FALSE;  
    /** 缓存ProtocolResponse类的属性和方法 */  
    private Map<Class<?>, ResponseDescription[]> responseProperties = new HashMap<Class<?>, ResponseDescription[]>();  
    private IoBuffer headBuffer;  
    private IoBuffer bodyBuffer;  
    private static final Object[] emptyObjects = new Object[] {};  
  
    public ProtocolWriteDTO(String errorNo, String errorInfo,  
            List<? extends ProtocolResponse> list) {  
        super();  
        this.errorNo = errorNo;  
        this.errorInfo = errorInfo;  
        makeIoBuffer(list);  
    }  
  
    /** 
     * 创建返回报文 
     *  
     * @param list 
     * @throws InvocationTargetException 
     * @throws IllegalAccessException 
     * @throws IllegalArgumentException 
     */  
    private void makeIoBuffer(List<? extends ProtocolResponse> list) {  
        makeBody(list);  
        makeHead();  
    }  
  
    private final void makeBody(List<? extends ProtocolResponse> list) {  
        if (list == null || list.size() == 0) {  
            return;  
        }  
        bodyBuffer = IoBuffer.allocate(256, false).setAutoExpand(true);  
        CharsetEncoder encoder = charset.newEncoder();  
        int last = list.size() - 1;  
        for (int i = 0; i <= last; i++) {  
            ProtocolResponse response = list.get(i);  
            // 读取response中所有的属性值  
            ResponseDescription[] readMethods = getResponseProperty(response  
                    .getClass());  
            try {  
                for (ResponseDescription des : readMethods) {  
                    Object objectValue = des.method.invoke(response,  
                            emptyObjects);  
                    String value = objectValue == null ? "" : objectValue  
                            .toString();  
                    bodyBuffer.putString(des.property, encoder)  
                            .putString(equalString, encoder)  
                            .putString(value, encoder)  
                            .putString(separateString, encoder);  
                }  
                // buffer中写入是否有下一条记录的标识  
                if (i == last) {  
                    // 最后一条  
                    bodyBuffer.putString(noNext, encoder).putString(  
                            separateString, encoder);  
                    // 加入错误代码和错误信息的内容  
                    bodyBuffer.putString(errorNoFlag, encoder)  
                            .putString(equalString, encoder)  
                            .putString(errorNo, encoder)  
                            .putString(separateString, encoder);  
                    bodyBuffer.putString(errorInfoFlag, encoder)  
                            .putString(equalString, encoder)  
                            .putString(errorInfo, encoder)  
                            .putString(separateString, encoder);  
                } else {  
                    bodyBuffer.putString(hasNext, encoder);  
                }  
                bodyBuffer.putString(separateString, encoder);  
            } catch (Exception e) {  
                // TODO: handle exception  
                e.printStackTrace();  
            }  
        }  
    }  
  
    private final void makeHead() {  
        headBuffer = IoBuffer.allocate(headDataLength);  
        headBuffer.putInt(this.bodyBuffer.position());  
    }  
  
    /** 
     * 获取输出的IoBuffer 
     *  
     * @return 
     */  
    public IoBuffer getResultBuffer() {  
        bodyBuffer.flip();  
        headBuffer.flip();  
        IoBuffer buffer = IoBuffer.allocate(  
                headBuffer.limit() + bodyBuffer.limit(), false);  
        buffer.put(headBuffer).put(bodyBuffer);  
        buffer.flip();  
        return buffer;  
    }  
  
    /** 
     * 记录属性和对应的方法 
     *  
     * @author linpl 
     * @version $Id: ProtocolWriteDTO.java,v 0.1 2012-11-23 下午02:26:38 linpl Exp 
     *          $ 
     */  
    private static final class ResponseDescription {  
        private String property;  
        private Method method;  
  
        public ResponseDescription(String property, Method method) {  
            super();  
            this.property = property;  
            this.method = method;  
        }  
    }  
  
    private ResponseDescription[] getResponseProperty(Class<?> clazz) {  
        ResponseDescription[] result = responseProperties.get(clazz);  
        if (result != null) {  
            return result;  
        }  
        // 向responseProperties加入clazz  
        PropertyDescriptor[] propertyDes = BeanUtils  
                .getPropertyDescriptors(clazz);  
        List<ResponseDescription> methods = new ArrayList<ResponseDescription>();  
        for (PropertyDescriptor pd : propertyDes) {  
            Method readMethod = pd.getReadMethod();  
            if (readMethod != null && !pd.getName().equals("class")) {  
                methods.add(new ResponseDescription(pd.getName(), readMethod));  
            }  
        }  
        result = methods.toArray(new ResponseDescription[methods.size()]);  
        responseProperties.put(clazz, result);  
        return result;  
    }  
  
    public void setErrorNo(String errorNo) {  
        this.errorNo = errorNo;  
    }  
  
    public void setErrorInfo(String errorInfo) {  
        this.errorInfo = errorInfo;  
    }  
}  

    这里还需要给出ProtocolResponse的说明。ProtocolResponse仅仅是一个标记借口,实现该接口的类都是POJO,如上面用的SimpleProtocolResponse。 
    SimpleProtocolResponse.java 

public class SimpleProtocolResponse implements ProtocolResponse {  
  
    private static final long serialVersionUID = 5507077327194923319L;  
  
    private String workNO;  
    private String name;  
    private String sex;  
    private String age;  
    private String telephone;  
    private String address;  
  
    public String getWorkNO() {  
        return workNO;  
    }  
  
    public void setWorkNO(String workNO) {  
        this.workNO = workNO;  
    }  
  
    public String getName() {  
        return name;  
    }  
  
    public void setName(String name) {  
        this.name = name;  
    }  
  
    public String getSex() {  
        return sex;  
    }  
  
    public void setSex(String sex) {  
        this.sex = sex;  
    }  
  
    public String getAge() {  
        return age;  
    }  
  
    public void setAge(String age) {  
        this.age = age;  
    }  
  
    public String getTelephone() {  
        return telephone;  
    }  
  
    public void setTelephone(String telephone) {  
        this.telephone = telephone;  
    }  
  
    public String getAddress() {  
        return address;  
    }  
  
    public void setAddress(String address) {  
        this.address = address;  
    }  
  
}  

 

 

 

    如果认真的看上面的内容,到这里估计都明白的,不用怎么说明了,看看结果吧。 
    注意:这里只是演示通讯协议实现,所以返回结果什么的都是写死在代码中的。像这样: 

@Override  
    public void messageReceived(IoSession session, Object message)  
            throws Exception {  
        log.debug("Receive message:" + message);  
        // 获取接收到的数据  
        ProtocolReadDTO readDTO = (ProtocolReadDTO) message;  
        // 根据readDTO中不同的key可以执行不同的业务代码  
        if (readDTO.getKey() == null) {  
            // 没有key,返回错误信息  
            ProtocolWriteDTO writeDTO = new ProtocolWriteDTO("-1", "请设置key值",  
                    null);  
            session.write(writeDTO);  
        }  
        // 如readDTO.getKey()为111,查询“基本信息”SimpleProtocolResponse  
        // 调用业务代码...这边演示就直接构造SimpleProtocolResponse了...  
        List<SimpleProtocolResponse> resultList = new ArrayList<SimpleProtocolResponse>();  
        SimpleProtocolResponse response = new SimpleProtocolResponse();  
        response.setWorkNO("1");  
        response.setName("张三");  
        response.setSex("男");  
        response.setAge("43");  
        response.setTelephone("13566488756");  
        response.setAddress("杭州市上城区......");  
        resultList.add(response);  
        SimpleProtocolResponse response1 = new SimpleProtocolResponse();  
        response1.setWorkNO("2");  
        response1.setName("李四");  
        response1.setSex("男");  
        response1.setAge("28");  
        response1.setTelephone("18864572231");  
        response1.setAddress("杭州市西湖区......");  
        resultList.add(response1);  
        ProtocolWriteDTO writeDTO = new ProtocolWriteDTO("0", "查询成功", resultList);  
        session.write(writeDTO);  
        // 如readDTO.getKey()为112,查询“基本信息”DetailProtocolResponse(未给出示例)  
        // ......  
    }  

 看一下页面的输出代码和结果吧(这块代码未必严谨,仅仅是显示一下返回内容,不是现在讨论的内容)。 

public class MockClientAction {  
    @Autowired  
    private RemoteService remoteService;  
  
    @RequestMapping(value = "/hello", method = RequestMethod.POST)  
    public String sentHello(@RequestParam("key") String key,  
            @RequestParam("age") String age, @RequestParam("sex") String sex,  
            ModelMap model) {  
        ServerRequest request = new ServerRequest();  
        request.addParameter("KEY", key);  
        request.addParameter("age", age);  
        request.addParameter("sex", sex);  
        ServerResponse response = remoteService.executeRequest(request);  
        if (response != null) {  
            if (response.getErrorNo() != null) {  
                System.out.println("[errorNo" + "=" + response.getErrorNo()  
                        + ";errorInfo=" + response.getErrorInfo() + "]");  
                model.put("errorNo",response.getErrorNo());  
                model.put("errorInfo",response.getErrorInfo());  
            }  
            List<ServerResponseEntry> list = response.getList();  
            for (ServerResponseEntry entry : list) {  
                System.out.print("[");  
                for (Entry<String, String> e : entry.getRow().entrySet()) {  
                    System.out.print(e.getKey() + "=" + e.getValue() + ";");  
                }  
                System.out.println("]");  
            }  
            model.addAttribute("result", response);  
        }  
        return "hello";  
    }  
}  

显示结果的vm内容 

接收到服务端返回数据(Receive Data):<br/>  
$errorNo<br/>  
$errorInfo<br/>  
<table>  
#foreach($entry in $result.list)  
    #set($row = $entry.row)  
    <tr>  
        <td>$row.workNO</td>  
        <td>$row.name</td>  
        <td>$row.sex</td>  
        <td>$row.age</td>  
        <td>$row.telephone</td>  
        <td>$row.address</td>  
    </tr>  
#end  
</table>  

     好长,好多代码,终于结束了。 
    (你竟然看到了这里,肯定是直接拉到下面的吧!!!哈哈哈哈哈哈哈) 

 

 

 

 

 

 

如果本文对您有帮助,点一下右下角的“推荐”
目录
相关文章
|
网络协议 编解码