NIO
NIO New IO 也叫走None-Blocking IO 非阻塞IO。
BIO blocking IO 阻塞IO。
无论是什么IO,都是在IO的基础上分离出来的。
java.nio.* 包包含了以下关键构造
- Buffers (缓冲) - 数据容器
- Chartsets (字符集) - 字节和 Unicode 码的翻译容器
- Channels (通道) - 表示实体 I/O 操作的连接
- Selectors (选择器) - 提供可选择的、多路非阻塞 IO
- Regexps (正则表达式) - 提供一些操作正则表达式的工具
简介
MINA 附带有对各种现有协议的实现:HTTP、XML、TCP、LDAP、DHCP、NTP、DNS、XMPP、SSH、FTP …在某种情况下,MINA 不仅可以作为一个 NIO 框架,也可以作为一个具有各种协议实现的网络传输层。MINA 近期要推出的新特性之一就是提供一个你可以使用现有的协议的集合。
应用架构
mina应用架构
一个基于 MINA 的应用看起来像什么?
mina是粘合剂,并且mina对下封装了TCP/UDP等等一系列协议。
MINA 是你的应用程序 (可能是一个客户端应用或者一个服务器端应用) 和基础网络层之间的粘合剂,可以基于 TCP、UDP、in-VM 通信甚至一个客户端的 RS-232C 串行协议。
你要做的仅仅是在 MINA 之上设计你自己的应用实现,而无需处理网络层的那些复杂业务。
mina本身的架构
三个架构层
概况来讲,基于 MINA 的应用划分为三个层次:
- I/O Service (I/O 服务) - 具体 I/O 操作
- I/O Filter Chain (I/O 过滤器链) - 将字节过滤/转换为想要的数据结构。反之亦然
- I/O Handler (I/O 处理器) - 这里实现实际的业务逻辑
如何自定义mina架构
因此,要想创建一个基于 MINA 的应用,你需要:
- 创建一个 I/O service - 从已存在的可用 service (*Acceptor) 中挑选一个或者创建你自己的
- 创建一个 Filter Chain - 从现有 Filter 中挑选,或者创建一个用于转换请求/响应的自定义 Filter
- 创建一个 I/O Handler - 处理不同消息时编写具体业务逻辑
当然,MINA 提供的东东不仅于此,你可能会注意其他的一些方面的内容,比如消息加密/解密,网络配置如何扩大规模,等等
服务端架构(数据处理流程)
从根本上说,服务器端监听一个端口以获得连入的请求,将其进行处理然后发送回复。服务器端还会为每个客户端 (无论是基于 TCP 还是基于 UDP 协议的) 创建并维护一个 session!
- I/O Acceptor 监听网络以获取连入的连接或者包
- 对于一个新的连接,一个新的 session 会被创建,之后所有来自该 IP 地址/端口号组合的请求会在同一 session 中处理
- 在一个 session 中接收到的所有包,将穿越上图中所示的 Filter Chain (过滤器链)。过滤器可以被用于修正包的内容 (比如转化为对象,添加或者删除信息等等)。对于从原始字节到高层对象的相互转换,PacketEncoder/Decoder 相当有用。
- 包或转化来的对象最终交给 IOHandler。IOHandler 可以用于实现各种具体业务需求。
session 的创建
只要一个客户端连接到了 MINA 服务器端,我们就要创建一个新的 session 以存放持久化数据。即使协议还没连接上,也要创建这么一个 session。
mina处理接收到的消息
假定 session 已被创建,新传入的消息将导致一个 selector 被唤醒。
客户端架构
客户端需要连接到一个服务端,发送消息并处理响应
- 客户端首先创建一个 IOConnector (用以连接 Socket 的 MINA Construct (构件)),开启一个服务器的绑定
- 在连接创建时,一个 Session 会被创建并关联到该连接
- 应用或者客户端写入 Session,导致数据在穿越 Filter Chain (过滤器链) 后被发送给服务器端
- 所有接收自服务器端的响应或者消息穿越 Filter Chain (过滤器链) 后由 IOHandler 接收并处理
总的来说,从network拿到数据经过我们mina的一顿操作后,再返回去。
配置TCP Server
我们以创建一个叫做 MinaTimeServer.java 的文件开始。初始化代码如下:
public class MinaTimeServer {
public static void main(String[] args) {
// code will go here next
}
}
这段程序对所有人来说都很简单明了。我们简单定义了一个用于启动程序的 main 方法。现在,我们开始添加组成我们服务器的代码。首先,我们需要一个用于监听传入连接的对象。因为本程序基于 TCP/IP,我们在程序中添加了 SocketAcceptor。
public class MinaTimeServer
{
private static final int PORT = 9123;
public static void main( String[] args )
{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.bind( new InetSocketAddress(PORT) ); //默认绑定本地ip
}
}
如你所见,有一个关于 acceptor.setLocalAddress( new InetSocketAddress(PORT) ); 的调用
这个方法定义了这一服务器要监听到的主机和端口。最后一个方法是 IoAcceptor.bind() 调用。这个方法将会绑定到指定端口并开始处理远程客户端请求。
配置过滤器
接下来我们在配置中添加一个过滤器。这个过滤器将会日志记录所有信息,比如 session 的新建、接收到的消息、发送的消息、session 的关闭。
接下来的过滤器是一个 ProtocolCodecFilter。这个过滤器将会把二进制或者协议特定的数据翻译为消息对象,反之亦然。我们使用一个现有的 TextLine 工厂因为它将为你处理基于文本的消息 (你无须去编写 codec 部分)。
public class MinaTimeServer
{
public static void main( String[] args )
{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.bind( new InetSocketAddress(PORT) );
}
}
配置处理器
接下来,我们将定义用于服务客户端连接和当前时间的请求的处理器。
处理器类是一个必须实现 IoHandler 接口的类。对于几乎所有的使用 MINA 的程序,这里都会变成程序的主要工作所在,因为它将服务所有来自客户端的请求。
我们将扩展 IoHandlerAdapter 类。这个类遵循了适配器设计模式,简化了需要为满足在一个类中传递实现了 IoHandler 接口的需求而要编写的代码量。
public class MinaTimeServer
{
public static void main( String[] args ) throws IOException
{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
acceptor.bind( new InetSocketAddress(PORT) );
}
}
TimeServerHandler
public class TimeServerHandler extends IoHandlerAdapter {
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
String str = message.toString();
if( str.trim().equalsIgnoreCase("quit") ) {
session.close();
return;
}
Date date = new Date();
session.write( date.toString() );
System.out.println("Message written...");
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println( "IDLE " + session.getIdleCount( status ));
}
}
exceptionCaught
exceptionCaught应该总是在处理器中进行定义,以处理正常的远程连接过程时抛出的异常。如果这一方法没有定义,可能无法正常报告异常。
messageReceived
messageReceived方法会从客户端接收数据并将当前时间回写给客户端。如果接收自客户端的消息是单词 “quit”,那么当前 session 将被关闭。
取决于你所使用的协议编解码器,传递到这一方法的对象 (第二个参数) 会有所不同,就和你传给 session.write(Object) 方法的对象一样。如果你不定义一个协议编码器,你很可能会接收到一个 IoBuffer 对象,而且被要求写出一个 IoBuffer 对象。
sessionIdle
一旦 session 保持空闲状态到达 acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 ); 所定义的时间长度,sessionIdle 方法会被调用。
session配置
现在我们对 NioSocketAcceptor 中的配置进行添加。这将允许我们为用于接收客户端连接的 socket 进行 socket 特有的设置。
public class MinaTimeServer
{
public static void main( String[] args ) throws IOException
{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
}
}
inaTimeServer 类中新加了两行。这些方法设置了 IoHandler,为 session 设置了输入缓冲区大小以及 idle 属性。指定缓冲区大小以通知底层操作系统为传入的数据分配多少空间。第二行指定了什么时候检查空闲 session。
在对 setIdleTime 的调用中,第一个参数定义了再断定 session 是否闲置时要检查的行为,第二个参数定义了在 session 被视为空闲之前以毫秒为单位的时间长度内必须发生。
运行server
剩下的工作就是定义服务器端将要监听的 socket 地址,并进行启动服务的调用。代码如下所示:
public class MinaTimeServer
{
private static final int PORT = 9123;
public static void main( String[] args ) throws IOException
{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
}
}
尝试给链接server
telnet 127.0.0.1 9123
结果:
客户端
Trying 127.0.0.1…
Connected to 127.0.0.1.
Escape character is ‘^]’.
hello
Wed Oct 17 23:23:36 EDT 2007
quit
Connection closed by foreign host.
user@myhost:~> MINA Time server started.
服务端
MINA Time server started.
Message written…
配置TCP Client
要构建一个客户端,我们需要做以下事情:
- 创建一个 Connector
- 创建一个 Filter Chain
- 创建一个 IOHandler 并添加到 Connector
- 绑定到服务器
创建一个 Connector
NioSocketConnector connector = new NioSocketConnector();
现在,我们已经创建了一个 NIO Socket 连接器
创建一个 Filter Chain
if (USE_CUSTOM_CODEC) {
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new SumUpProtocolCodecFactory(false)));
} else {
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
}
我们为 Connector 的 Filter Chain 添加了一些过滤器。这里我们添加的是一个 ProtocolCodec 到过滤器链。
绑定到服务器
IoSession session;
for (;;) {
try {
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));
future.awaitUninterruptibly();
session = future.getSession();
break;
} catch (RuntimeIoException e) {
System.err.println("Failed to connect.");
e.printStackTrace();
Thread.sleep(5000);
}
}
是最重要的部分。我们将连接到远程服务器。因为是异步连接任务,我们使用了 ConnectFuture 来了解何时连接完成。一旦连接完成,我们将得到相关联的 IoSession。要向服务器端发送任何消息,我们都要写入 session。所有来自服务器端的响应或者消息都将穿越 Filter chain 并最终由 IoHandler 处理。
译者注:翻译版本的项目源码见
https://github.com/waylau/apache-mina-2-user-guide-demos 中的
com.waylau.mina.demo.sumup包下
配置UDP Server
现在我们看一下
org.apache.mina.example.udp 包里的代码。简单起见,我们将只专注于 MINA 相关构建方面的东西。
要构建服务器我们需要做以下事情:
- 创建一个 Datagram Socket 以监听连入的客户端请求 (参考 MemoryMonitor.java)
- 创建一个 IoHandler 以处理 MINA 框架生成的事件 (参考 MemoryMonitorHandler.java)
Datagram Socket
这里是第 1 点提到的一些代码片段:
NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
acceptor.setHandler(new MemoryMonitorHandler(this));
这里我们创建了一个 NioDatagramAcceptor 以监听连入的客户端请求,并设置了 IoHandler。”PORT” 是一整型变量。下一步将要为这一 DatagramAcceptor 使用的过滤器链添加一个日志过滤器。LoggingFilter 实在 MINA 中表现不错的一个选择。它在不同阶段产生日志事务,以为我们观察 MINA 的工作情况。
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
chain.addLast("logger", new LoggingFilter());
接下来我们来看一些更具体的 UDP 传输的代码。我们设置 acceptor 以复用地址:
DatagramSessionConfig dcfg = acceptor.getSessionConfig();
dcfg.setReuseAddress(true);
acceptor.bind(new InetSocketAddress(PORT));
当然,要做的最后一件事就是调用 bind()。
IoHandler 实现
对于我们服务器实现有三个主要事件:
- Session 创建
- Message 接收
- Session 关闭
我们分别看看它们的具体细节。
Session 创建事件
@Override
public void sessionCreated(IoSession session) throws Exception {
SocketAddress remoteAddress = session.getRemoteAddress();
server.addClient(remoteAddress);
}
在这个 session 创建事件中,我们调用了 addClient() 方法,它为界面添加了一个选项卡。
Message 收到事件s
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
if (message instanceof IoBuffer) {
IoBuffer buffer = (IoBuffer) message;
SocketAddress remoteAddress = session.getRemoteAddress();
server.recvUpdate(remoteAddress, buffer.getLong());
}
}
在这个消息接收到事件中,我们对接收到的消息中的数据进行了处理。需要发送返回的应用,可以在这个方法中处理消息并回写响应。
Session 关闭事件
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("Session closed...");
SocketAddress remoteAddress = session.getRemoteAddress();
server.removeClient(remoteAddress);
}
配置UDP Client
客户端的实现需要做的事情:
- 创建套接字并连接到服务器端
- 设置 IoHandler
- 收集空闲内存
- 发送数据到服务器端
现在我们看一下
org.apache.mina.example.udp.client 包中的 MemMonClient.java。前几行代码简单明了:
connector = new NioDatagramConnector();
connector.setHandler( this );
ConnectFuture connFuture = connector.connect( new InetSocketAddress("localhost", MemoryMonitor.PORT ));
我们创建了一个 NioDatagramConnector,设置了处理器然后连接到服务器。
我曾经落入的一个陷阱是,你必须在 InetSocketAddress 对象中设置主机,否则它将什么也不干。
这个例子是在一台 Windows XP 主机上编写并测试,因此在其他环境中可能会有所不同。解析来我们将等待客户端连接到的主机的确认。一旦得知我们已经建立连接,我们就可以开始向服务器端写数据了:
connFuture.addListener( new IoFutureListener(){
public void operationComplete(IoFuture future) {
ConnectFuture connFuture = (ConnectFuture)future;
if( connFuture.isConnected() ){
session = future.getSession();
try {
sendData();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
log.error("Not connected...exiting");
}
}
});
这里我们为 ConnectFuture 对象添加了一个监听者,当我们接收到客户端已建立连接的回调时,我们就可以写数据了。向服务器端写数据将会由一个叫做 sendData 的方法处理。这个方法如下所示:
private void sendData() throws InterruptedException {
for (int i = 0; i < 30; i++) {
long free = Runtime.getRuntime().freeMemory();
IoBuffer buffer = IoBuffer.allocate(8);
buffer.putLong(free);
buffer.flip();
session.write(buffer);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
throw new InterruptedException(e.getMessage());
}
}
}
这个方法将在 30 秒之内的每秒钟向服务器端发送一次空闲内存的数量。在这里你可以看到我们分配了一个足够大的 IoBuffer 来保存一个 long 类型变量,然后将空闲内存的数量放进缓存。缓冲随即写给服务器端。
UDP Client 实现完成。
IoService接口
MINA IoService - 正如前面应用架构里提到过的,是支持所有 IO 服务的基类,不管是在服务器端还是在客户端。
它将处理所有与你的应用之间的交互,以及与远程对端的交互,发送并接收消息、管理 session、管理连接等等。
它是为一个接口,服务器端实现为 IoAcceptor,客户端为 IoConnector。
IOService脑图
分析
由图中可以看出,IoService 承担着很多职责:
- session 管理:创建和删除 session,检测闲置 session
- 过滤器链管理:操纵过滤器链,允许用户运行中改变过滤器链
- 处理器调用:接收到新消息时调用处理器,等等
- 统计管理:更新发送消息的数量、发送字节的数量,等等
- 监听器管理:管理用户创建的监听器
- 通信管理:在服务器端和服务器端处理传输的数据
IoService 是一个接口,它被 MINA 中最重要的两个类实现:
- IoAcceptor
- IoConnector
要构建一个服务器,你需要选择一个 IoAcceptor 接口的实现。对于客户端应用,你需要选择一个 IoConnector 接口的实现。
IoAcceptor
根本上讲,IoAcceptor 接口是因为 accept() 方法的缘故所命名,这个方法负责客户端和服务器端连接的创建。服务器端接收连入的连接请求。
因为我们可能要应对不止一种类型的传输协议 (TCP/UDP/…),我们为这一接口提供了多个实现。不太可能需要你再实现一个新的。
我们具有以下具体实现类:
- NioSocketAcceptor:非阻塞套接字传输 IoAcceptor
- NioDatagramAcceptor:非阻塞 UDP 传输 IoAcceptor
- AprSocketAcceptor:基于 APR 的阻塞套接字传输 IoAcceptor
- VmPipeSocketAcceptor:in-VM IoAcceptor
这里是 IoAcceptor 接口和类的类图:
创建示例
public TcpServer() throws IOException {
// Create a TCP acceptor
IoAcceptor acceptor = new NioSocketAcceptor();
// Associate the acceptor to an IoHandler instance (your application)
acceptor.setHandler(this);
// Bind : this will start the server...
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Server started...");
}
就是这样!你已经创建了一个 TCP 服务器。如果你想要启动一个 UDP 服务器,只需替换掉第一行代码:
...// Create an UDP acceptorIoAcceptor acceptor = new NioDatagramAcceptor();...
销毁
服务可以通过调用 dispose() 方法进行停止。服务只能在所有等待中的 session 都被处理之后才能停止:
// Stop the service, waiting for the pending sessions to be inactive
acceptor.dispose();
你也可以通过传递给这个方法一个布尔类型的参数等待每一个执行中的线程正常结束:
// Stop the service, waiting for the processing session to be properly completed
acceptor.dispose( true );
状态
你可以通过调用以下方法之一拿到 IoService 的状态:
- isActive():如果服务能够接受连入请求返回 true
- isDisposing():如果 dispose() 方法已被调用返回 true。这并不能说明服务已经停止 (可能还有一些 session 正在处理中)
- isDisposed():如果 dispose(boolean) 方法已被调用、并且执行中的线程已经结束,返回 true
管理 IoHandler
当服务实例化之后你可以添加或者获取其关联到的 IoHandler。你只需要去调用一把 setHandler(IoHandler) 或者 getHandler() 方法。
管理过滤器链
如果你想要管理过滤器链,你得调用一把 getFilterChain() 方法,如下所示:
// Add a logger filterDefaultIoFilterChainBuilder
chain = acceptor.getFilterChain();
chain.addLast("logger", new LoggingFilter());
你也可以在将过滤器链设置进服务之前先创建它:
// Add a logger filter
DefaultIoFilterChainBuilder chain = new DefaultIoFilterChainBuilder();
chain.addLast("logger", new LoggingFilter());
// And inject the created chain builder in the service
acceptor.setFilterChainBuilder(chain);
IoConnector
我们需要为客户端实现 IoConnector。我们提供了以下具体实现类:
- NioSocketConnector:非阻塞套接字传输 IoConnector
- NioDatagramConnector:非阻塞 UDP 传输 IoConnector
- AprSocketConnector:基于 APR 的阻塞套接字传输 IoConnector
- ProxyConnector:一个提供代理支持的 IoConnector
- SerialConnector:一个用于串行传输的 IoConnector
- VmPipeConnector:in-VM IoConnector
你只需挑选一个适合你需要的。
这里是 IoConnector 接口和类的类图:
Session
Session(会话)是 MINA 的核心。每当一个客户端连接到服务器,一个新的会话会被创建,并会在客户端关掉连接前一直保存在内存中。
会话用于保存连接的持久信息,以及在请求处理过程中、会话的生命周期中服务器可能需要用到的任何信息。
会话的状态
会话会有一个随着时间演变的状态:
- 已连接:会话已被创建并可用
- 闲置:会话在至少一段时间 (这段时间是可配的) 内没有处理任何请求读闲置:在一段时间内没有任何读操作写闲置:在一段时间内没有任何写操作同时闲置:在一段时间内既没有读操作也没有写操作
- 关闭中:会话正在关闭中 (还有正在清空的消息,清理尚未结束)
- 已关闭:会话现在已被关闭,没有其他方法可以将其恢复。
以下状态图揭示了所有可能的状态及其转换:
idle 闲置的
配置
对于特定会话可以设置以下不同的参数:
- 接收缓冲大小
- 发送缓冲大小
- 空闲时间
- 写超时时间
另外还有一些其他配置,取决于你所用的传输类型 (参考Chapter 6 - Transports)。
管理用户定义的属性
有可能需要存储一些以后可能会用到的数据。这个使用每个会话关联到的专用数据结构来实现。这是一个键值对组合,它可以存储开发人员可能希望保存的任何类型的数据。
例如,如果你想跟踪会话创建以后用户已发送的请求次数,把它放进集合很容易:只需要创建将要关联到这个值的键即可。
...
int counterValue = session.getAttribute( "counter" );
session.setAttribute( "counter", counterValue + 1 );
...
这样就有了将保存属性放进会话的方法:属性就是一个键值对,它可以从会话的容器中添加、删除以及读取。
容器在会话创建时会被自动创建,而在会话结束时会被清理。
定义容器
如上所述,容器是存储键值对的容器,默认情况下是一个 Map,但是也可以定义做其他的数据结构,如果你想处理长寿命周期的数据,或者避免将所有大数据都存储在内存:我们可以实现一个接口和一个工厂用于在会话建立时创建容器。
以下代码演示了会话初始化时容器的创建:
protected final void initSession(IoSession session,
IoFuture future, IoSessionInitializer sessionInitializer) {
...
try {
((AbstractIoSession) session).setAttributeMap(session.getService()
.getSessionDataStructureFactory().getAttributeMap(session));
} catch (IoSessionInitializationException e) {
throw e;
} catch (Exception e) {
throw new IoSessionInitializationException(
"Failed to initialize an attributeMap.", e);
}
...
如果我们想要定义其他类型的容器,这里是我们可以实现的工厂接口:
public interface IoSessionDataStructureFactory {
/**
* Returns an {@link IoSessionAttributeMap} which is going to be associated
* with the specified session. Please note that the returned
* implementation must be thread-safe.
*/
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
}
过滤器链
每个会话会关联到一个过滤器链,在连入一个请求或者接收/发出消息时这个过滤器链将会对其处理。这些过滤器针对于每个每个单独的会话,在多数情况下,会为所有现有会话使用同一个过滤器链。
但是,也可以为单个会话动态修改过滤器链,比如为指定的会话的过滤器链添加一个日志过滤器。
统计
每个会话也会保持对会话处理结束的一些记录的跟踪:
- 接收或发送的字节数
- 接收或发送的消息数
- 闲置状态
- 吞吐量
以及其他一些有用的信息。
处理器
最后,并非最不重要的,每个会话会被附加到一个处理器,该处理器负责调度给你的应用的消息。这个处理器也会通过使用会话发送响应包,只需调用 write() 方法:
...
session.write( );
...
filter
IoFilter 扮演着很重要角色,它是 MINA 的核心结构之一。
它过滤 IoService 和 IoHandler 之间的所有 I/O 事件和请求。
以下的开箱即用过滤器简化横切注入用来提升网络应用的开发速度:
- LoggingFilter 记录所有事件和请求
- ProtocolCodecFilter 将一个连入的 ByteBuffer 转化为消息 POJO,反之亦然
- CompressionFilter 压缩所有数据
- SSLFilter 添加 SSL - TLS - StartTLS 支持
- 更多!
过滤器 | 类 | 描述 |
Blacklist | BlacklistFilter | 阻止列入黑名单的远程地址的连接 |
Buffered Write | BufferedWriteFilter | 缓存输出请求,就像 BufferedOutputStream 所做的那样 |
Compression | CompressionFilter | |
ConnectionThrottle | ConnectionThrottleFilter | |
ErrorGenerating | ErrorGeneratingFilter | |
Executor | ExecutorFilter | |
FileRegionWrite | FileRegionWriteFilter | |
KeepAlive | KeepAliveFilter | |
Logging | LoggingFilter | 日志事件消息,比如 MessageReceived、MessageSent、SessionOpened 等等 |
MDC Injection | MdcInjectionFilter | 将关键 IoSession 属性注入 MDC |
Noop | NoopFilter | 一个不作任何事情的过滤器。用于测试。 |
Profiler | ProfilerTimerFilter | 分析事件消息,比如 MessageReceived、MessageSent、SessionOpened 等等 |
ProtocolCodec | ProtocolCodecFilter | 负责对消息进行编码和解码的过滤器 |
Proxy | ProxyFilter | |
Reference counting | ReferenceCountingFilter | 跟踪本过滤器的使用次数 |
RequestResponse | RequestResponseFilter | |
SessionAttributeInitializing | SessionAttributeInitializingFilter | |
StreamWrite | StreamWriteFilter | |
SslFilter | SslFilter | |
WriteRequest | WriteRequestFilter |
选择性重写事件
你可以对 IoAdapter 重写以取代直接实现 IoFilter 的做法。除非重写,否则所有接收到的事件将被直接转发给下一个过滤器。
jpublic class MyFilter extends IoFilterAdapter {
@Override
public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception {
// Some logic here...
nextFilter.sessionOpened(session);
// Some other logic here...
}
}
转换写请求
果你要通过 IoSession.write() 对一个连入的写请求进行转换,事情可能会变得相当棘手。例如,假设你的过滤器在一个 HighLevelMessage 对象调用 IoSession.write() 时将 HighLevelMessage 转换为 LowLevelMessage。你可以在你的过滤器的 filterWrite() 方法里添加适当的转换代码并认为一切就这样了。但是你也需要注意 messageSent 事件,因为一个 IoHandler 或者任何之后的过滤器会期望 messageSent() 方法以 HighLevelMessage 作为参数调用,因为让调用者在写 HighLevelMessage 的时候被通知到 HighLevelMessage 已发送是不合理的。
因此,如果你的过滤器负责转换时你最好同时实现 filterWrite() 和 messageSent()。
假定你在实现一个将字符串转换为字符数组的过滤器。你的过滤器的 filterWrite() 将会类似于:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
nextFilter.filterWrite(
session, new DefaultWriteRequest(
((String) request.getMessage()).toCharArray(), request.getFuture(), request.getDestination()));
}
现在我们需要在 messageSent() 中做相反的事情:
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
nextFilter.messageSent(session, new String((char[]) message));
}
字符串到字节缓存的转换怎么样?这样我们会更加高效,我们不在需要重建原始消息 (字符串)。但是,这比前面的例子复杂:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
String m = (String) request.getMessage();
ByteBuffer newBuffer = new MyByteBuffer(m, ByteBuffer.wrap(m.getBytes());
nextFilter.filterWrite(
session, new WriteRequest(newBuffer, request.getFuture(), request.getDestination()));
}
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
if (message instanceof MyByteBuffer) {
nextFilter.messageSent(session, ((MyByteBuffer) message).originalValue);
} else {
nextFilter.messageSent(session, message);
}
}
private static class MyByteBuffer extends ByteBufferProxy {
private final Object originalValue;
private MyByteBuffer(Object originalValue, ByteBuffer encodedValue) {
super(encodedValue);
this.originalValue = originalValue;
}
}
sessionCreated
sessionCreated 是一个特殊事件,它必须在 I/O 处理程序中执行 (参考 线程模型的配置)。决不允许将 sessionCreated 事件转发给其他线程。
public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception {
// ...
nextFilter.sessionCreated(session);
}
// 不要这么干
public void sessionCreated(final NextFilter nextFilter, final IoSession session) throws Exception {
Executor executor = ...;
executor.execute(new Runnable() {
nextFilter.sessionCreated(session);
});
}
小心空缓存!
在一些案例中 MINA 使用了一个空的缓冲区作为一个内部信号。空缓存有时会成为一个问题,因为它可能会造成各种各样的异常,比如 IndexOutOfBoundsException。这里我们介绍如何避免类似于这种难以预料的情况。
ProtocolCodecFilter 使用了一个空缓存 (比如 buf.hasRemaining() = 0) 来标记消息的结束部分。如果你的过滤器放在 ProtocolCodecFilter 之前,如果你的过滤器实现在缓存为空时能抛出一个异常的话,请确认你的过滤器将空缓存转发给了下一个过滤器:
public void messageSent(NextFilter nextFilter, IoSession session, Object message) {
if (message instanceof ByteBuffer && !((ByteBuffer) message).hasRemaining()) {
nextFilter.messageSent(nextFilter, session, message);
return;
}
...
}
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest request) {
Object message = request.getMessage();
if (message instanceof ByteBuffer && !((ByteBuffer) message).hasRemaining()) {
nextFilter.filterWrite(nextFilter, session, request);
return;
}
...
}
这样的话,我们是否总是要为每个过滤器插入 if 块?幸运的是,不需要。这是处理空缓存的黄金法则:
- 如果你的过滤器及时在缓存为空时也能正常工作,你就完全不需要添加 if 块了
- 如果你的过滤器放在 ProtocolCodecFilter 之后,你也不需要添加 if 块
- 除此之外的话你就需要 if 块了
APR传输
APR (Apache Portable Runtime) 提供了更好的扩展性、性能以及更好的与本地服务器技术的集成。MINA 照常 APR 传输。现在我们将了解如何使用 MINA 进行 APR 传输。我们将为此使用时间服务器的例子。
先决条件
APR 传输取决于以下组件
APR 库 - 从
http://www.apache.org/dist/tomcat/tomcat-connectors/native/ 为你的平台下载并安装适当的库
JNI 包装 (tomcat-apr-5.5.23.jar) 这个 jar 附带于在发布版中
把本地库放在环境变量中
访问 时间服务器 例子以获取完整源代码。
译者注:翻译版本的项目源码见
https://github.com/waylau/apache-mina-2-user-guide-demos 中的com.waylau.mina.demo.time包下
现在看一下基于 NIO 的时间服务器应用:
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
然后看一下如何使用 APR 传输:
IoAcceptor acceptor = new AprSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind( new InetSocketAddress(PORT) );
我们只是将 NioSocketAcceptor 换成了 AprSocketAcceptor,仅仅如此,我们的时间服务器就是使用 APR 传输了。
其他完成过程保持不变。
串行传输
MINA 所提供的串行通信只有一个 IoConnector,根据点对点通信媒体的性质。
你需要一个 SerialConnector 以连接到一个串行端口:
// create your connector
IoConnector connector = new SerialConnector()
connector.setHandler( ... here your buisness logic IoHandler ... );
除了 SocketConnector 之外没啥不同的。
现在为连接到我们的串行端口创建一个地址:
SerialAddress portAddress=new SerialAddress( "/dev/ttyS0", 38400, 8, StopBits.BITS_1, Parity.NONE, FlowControl.NONE );
第一个参数是你的端口标识。对于 Windows 系统的电脑,串行端口被叫做 “COM1”、”COM2” 等等…对于 Linux 和其他 Unix 系统:”/dev/ttyS0”、”/dev/ttyS1”、”/dev/ttyUSB0”。
其他参数取决于你的设备和通信特性。
- 波特率
- 数据位
- 奇偶性
- 流控制机制
这个完成之后,将连接器连接到相应地址:
ConnectFuture future = connector.connect( portAddress );future.await();IoSession sessin = future.getSession();
就这些!其他照常,你可以插进你的过滤器和编解码器。更多信息请参考 RS232:
http://en.wikipedia.org/wiki/RS232。
Handler
处理 MINA 所触发 I/O 事件。这一接口时在过滤器链最后完成的所有活动的中心。
IoHandler 具有以下方法:
- sessionCreated
- sessionOpened
- sessionClosed
- sessionIdle
- exceptionCaught
- messageReceived
- messageSent
sessionCreated
会话建立事件在一个新的连接被创建时触发。对于 TCP 来说这是连接接受的结果,而对于 UDP 这个在接收到一个 UDP 包时产生。这一方法可以被用于初始化会话属性,并为一些特定连接执行一次性活动。
个方法由 I/O 处理线程的环境中调用,因此应该以一个低耗时的方式实现,因为同一个线程要处理多个会话。
sessionOpened
会话打开事件是在一个连接被打开时调用。它总是在 sessionCreated 事件之后调用。如果配置了一个线程模型,这一方法将在 I/O 处理线程之外的一个线程中调用。
sessionClosed
会话关闭事件在会话被关闭时调用。会话清理活动比如清理支付信息可以在这里执行。
sessionIdle
会话空闲时间在会话变为闲置状态时触发。这一方法并不为基于 UDP 的传输调用。
exceptionCaught
这一方法在用户代码或者 MINA 抛异常时调用。如果是一个 IOException 异常的话当前连接将被关闭。
messageReceived
消息接收事件在一个消息被接收到时触发。这是一个应用最常发生的处理。你需要照顾到所有要碰到的消息类型
messageSent
消息发送事件在消息响应被发送 (调用 IoSession.write()) 时触发。
MINA-Core
IoBuffer 操作
分配一个新缓存
IoBuffer 是个抽象类,因此不能够直接被实例化。要分配 IoBuffer,我们需要使用两个 allocate() 方法中的其中一个。
// Allocates a new buffer with a specific size, defining its type (direct or heap)
public static IoBuffer allocate(int capacity, boolean direct)
// Allocates a new buffer with a specific size
public static IoBuffer allocate(int capacity)
allocate() 方法具有一个或两个参数。第一种方式具有两个参数:
- capacity - 缓存的容量
- direct - 缓存的类型。true 的话获得直接缓存,false 的话获取堆缓存
默认的缓存分配由 SimpleBufferAllocator 进行处理。
作为一个选择,也可以使用以下形式:
// Allocates heap buffer by default.
IoBuffer.setUseDirectBuffer(false);
// A new heap buffer is returned.
IoBuffer buf = IoBuffer.allocate(1024);
当使用第二种形式的时候,别忘了先设置以下默认的缓存类型,否则你将被默认获得堆缓存。
创建自动扩展缓存
使用 Java NIO API 创建自动扩展缓存不是一件容易的事情,因为其缓存大小是固定的。有一个缓冲区,可以根据需要自动扩展是网络应用程序的一大亮点。为解决这个,IoBuffer 引入了 autoExpand 属性。它可以自动扩大容量和限值。
我们看一下如何创建一个自动扩展的缓存:
IoBuffer buffer = IoBuffer.allocate(8);
buffer.setAutoExpand(true);
buffer.putString("12345678", encoder);
// Add more to this buffer
buffer.put((byte)10);
潜在的 ByteBuffer 会被 IoBuffer 在幕后进行分配,如果上面例子中的编码信息大于 8 个字节的话。其容量将会加倍,而且其上限将会增加到字符串最后写入的位置。这行为很像 StringBuffer 类的工作方式。
这种机制很可能要在 MINA 3.0 中被移除,因为它其实并非最好的处理增加缓存大小的方法。它可能会被其它方案代替,像隐藏了一个列表的 InputStream,或者一个装有一系列固定容量的 ByteBuffer 的数组。
创建自动收缩的缓存
还有一些机制释放缓存中多余分配的字节,以保护内存。
IoBuffer 提供了 autoShrink 属性以满足这一需要。如果打开 autoShrink,当 compact() 被调用时 IoBuffer 将缓存分为两半,只有 1/4 或更少的当前容量正在被使用。要手工减少缓存的话使用 shrink() 方法。
可以用例子对此进行验证:
IoBuffer buffer = IoBuffer.allocate(16);
buffer.setAutoShrink(true);
buffer.put((byte)1);
System.out.println("Initial Buffer capacity = "+buffer.capacity());
buffer.shrink();
System.out.println("Initial Buffer capacity after shrink = "+buffer.capacity());
buffer.capacity(32);
System.out.println("Buffer capacity after incrementing capacity to 32 = "+buffer.capacity());
buffer.shrink();
System.out.println("Buffer capacity after shrink= "+buffer.capacity());
我们初始化分配的容量是为 16,并且将 autoShrink 属性设为 true。
我们看一下它的输出
Initial Buffer capacity = 16
Initial Buffer capacity after shrink = 16
Buffer capacity after incrementing capacity to 32 = 32
Buffer capacity after shrink= 16
现在停下来分析输出:
- 初始化缓存容量是为 16,因为我们使用的这一容量创建了缓存。内部实现使得这成为这个缓存容量的最小值
- 调用 shrink() 之后,容量保持 16,因为实际容量永不会比最小容量更小
- 当扩充容量为 32 之后,容量变为 32
- 调用 shrink(),容量被缩减为 16,从而消除了额外的存储
再次声明,这种机制是默认的,不需要显式告诉缓存它能够缩减。
缓存分配
IoBufferAllocater 负责分配并管理缓存。要获取堆缓存分配的精确控制,你需要实现 IoBufferAllocater 接口。
MINA 具有以下 IoBufferAllocater 实现:
- SimpleBufferAllocator (默认) - 每次创建一个新的缓存
- CachedBufferAllocator - 对扩展中可能会被复用的缓存进行高速缓存
对于最新的 JVM,使用高速缓存的 IoBuffer 不太可能提高性能。
你可以实现你自己的 IoBufferAllocator 并通过调用对 IoBuffer 的 setAllocator() 来做同样的事情。
编解码器过滤器
为什么使用 ProtocolCodecFilter?
- TCP 担保以正确的顺序交付所有数据包。但是没有担保对于在发送端写操作时影响到接收端的读事件。
- 大多数网络应用需要一种方法以找出当前消息的结束位置和下一条消息的起始位置。
- 你完全可以在你的 IoHandler 实现所有的这些逻辑,但是添加一个 ProtocolCodecFilter 可以让你的代码更加清晰并容易维护。
- 它将你的协议逻辑从你的业务逻辑中 (IoHandler) 剥离开来。
如何使用?
你的应用接收到的基本上是一串字节,你需要将这些字节转换为消息 (高层对象)。
有三种常见的技术用来将字节流分割为消息:
- 使用固定长度的字节
- 使用固定长度的报头来指示出报体的长度
- 使用定界符。例如,许多基于文本的协议在每条消息 (http://www.faqs.org/rfcs/rfc977.html) 后面紧跟一个新的空行 (或者 CR LF 对)。
本文将使用第一种和第二种方法,因为它们很明显更容易实现。然后我们再看一下使用定界符的做法。
我们将会开发一个 (毫无用处的) 图形字符发生器协议服务器来说明如何实现你自己协议的编解码器 (ProtocolEncoder、ProtocolDecoder 和 ProtocolCodecFactory)。这个协议是很简单的。
我们需要的用于请求和响应的编码和解码的类的概述:
- ImageRequest:一个表示对我们的图片服务器请求的简单的 POJO
- ImageRequestEncoder:将 ImageRequest 对象编码为协议专用数据 (由客户端使用)
- ImageRequestDecoder:将协议专用数据解码为 ImageRequest 对象 (由服务器端使用)
- ImageResponse:一个表示来自我们图片服务器端的响应的简单的 POJO
- ImageResponseEncoder:服务器端用以对 ImageResponse 对象编码的类
- ImageResponseDecoder:客户端用以对 ImageResponse 对象解码的类
- ImageCodecFactory:这个类负责创建需要的编码器和解码器
示例
ImageRequest
类源代码如下:
public class ImageRequest {
private int width;
private int height;
private int numberOfCharacters;
public ImageRequest(int width, int height, int numberOfCharacters) {
this.width = width;
this.height = height;
this.numberOfCharacters = numberOfCharacters;
}
public int getWidth() {
return width;
}
public int getHeight() {
return height;
}
public int getNumberOfCharacters() {
return numberOfCharacters;
}
}
ImageRequestEncoder
编码往往比解码容易,因此我们先从 ImageRequestEncoder 开始:
public class ImageRequestEncoder implements ProtocolEncoder {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageRequest request = (ImageRequest) message;
IoBuffer buffer = IoBuffer.allocate(12, false);
buffer.putInt(request.getWidth());
buffer.putInt(request.getHeight());
buffer.putInt(request.getNumberOfCharacters());
buffer.flip();
out.write(buffer);
}
public void dispose(IoSession session) throws Exception {
// nothing to dispose
}
}
MINA 将会为所有在 IoSession 的写队列里的消息调用编码方法。既然我们的客户端只会写 ImageRequest 对象,我们可以安全地把消息放进 ImageRequest
我们在堆空间分配了一个新的 IoBuffer。最好避免使用直接缓存,因为通常堆缓存表现的更好参考
http://issues.apache.org/jira/browse/DIRMINA-289
不需要你去释放缓存,MINA 为你代劳了。参考
http://mina.apache.org/mina-project/apidocs/org/apache/mina/core/buffer/IoBuffer.html
你应该在 dispose() 方法中释放所有在为特定会话编码时所获取的资源。如果没有任何事情要处理你可以让你的编码器继承自 ProtocolEncoderAdapter
ImageRequestDecoder
现在我们来看一下解码器。CumulativeProtocolDecoder 绝对对写你自己的编码器有很大帮助:它将把你的解码器决定对连入数据可以做一些事情之前都缓存起来。在这种情况下消息具有固定大小,因此很容易等待所有的数据到齐以后再进行一些操作:
public class ImageRequestDecoder extends CumulativeProtocolDecoder {
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.remaining() >= 12) {
int width = in.getInt();
int height = in.getInt();
int numberOfCharachters = in.getInt();
ImageRequest request = new ImageRequest(width, height, numberOfCharachters);
out.write(request);
return true;
} else {
return false;
}
}
}
备注:
- 每次在对一个完整的消息编码时,你应该将其写进 ProtocolDecoderOutput;这些消息将穿过过滤器链并最终到达你的 IoHandler.messageReceived 方法
- 你不必负责释放 IoBuffer
- 如果没有足够的可用数据用以解码一条消息,只需返回 false
ImageResponse
public class ImageResponse {
private BufferedImage image1;
private BufferedImage image2;
public ImageResponse(BufferedImage image1, BufferedImage image2) {
this.image1 = image1;
this.image2 = image2;
}
public BufferedImage getImage1() {
return image1;
}
public BufferedImage getImage2() {
return image2;
}
}
ImageResponseEncoder
响应的编码也很简单:
public class ImageResponseEncoder extends ProtocolEncoderAdapter {
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
ImageResponse imageResponse = (ImageResponse) message;
byte[] bytes1 = getBytes(imageResponse.getImage1());
byte[] bytes2 = getBytes(imageResponse.getImage2());
int capacity = bytes1.length + bytes2.length + 8;
IoBuffer buffer = IoBuffer.allocate(capacity, false);
buffer.setAutoExpand(true);
buffer.putInt(bytes1.length);
buffer.put(bytes1);
buffer.putInt(bytes2.length);
buffer.put(bytes2);
buffer.flip();
out.write(buffer);
}
private byte[] getBytes(BufferedImage image) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ImageIO.write(image, "PNG", baos);
return baos.toByteArray();
}
}
当无法提前预测 IoBuffer 的长度时,你可以通过调用 buffer.setAutoExpand(true); 使用一个自动扩展缓存
现在我们来看一下响应的解码:
ImageResponseDecoder
public class ImageResponseDecoder extends CumulativeProtocolDecoder {
private static final String DECODER_STATE_KEY = ImageResponseDecoder.class.getName() + ".STATE";
public static final int MAX_IMAGE_SIZE = 5 * 1024 * 1024;
private static class DecoderState {
BufferedImage image1;
}
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE_KEY);
if (decoderState == null) {
decoderState = new DecoderState();
session.setAttribute(DECODER_STATE_KEY, decoderState);
}
if (decoderState.image1 == null) {
// try to read first image
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
decoderState.image1 = readImage(in);
} else {
// not enough data available to read first image
return false;
}
}
if (decoderState.image1 != null) {
// try to read second image
if (in.prefixedDataAvailable(4, MAX_IMAGE_SIZE)) {
BufferedImage image2 = readImage(in);
ImageResponse imageResponse = new ImageResponse(decoderState.image1, image2);
out.write(imageResponse);
decoderState.image1 = null;
return true;
} else {
// not enough data available to read second image
return false;
}
}
return false;
}
private BufferedImage readImage(IoBuffer in) throws IOException {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
return ImageIO.read(bais);
}
}
- 我们将解码过程的状态保存为一个会话属性。也可以将这一状态保存在解码器对象自身中,但这么干有一些缺点: 每个 IoSession 需要自己的解码器实例MINA 能够确保不会有多个线程同时为同一个 IoSession 执行 decode() 方法,但这并不能担保执行这个方法的总述同一个线程。假设第一块数据有线程 1 处理,线程 1 认为还不能解码,当下一块数据到达时,它可能会被另一个线程处理。为了避免可见性问题,你必须对这一解码状态进行适当加锁 (IoSession 的属性保存在一个 ConcurrentHashMap中,因此它们对于其他线程是自动可见的)在邮件列表中的一个讨论得出了这一结论:关于是把状态保存在 IoSession 还是解码器实例中的选择是一个很有趣的问题。要确保不会有两个以上的线程为同一个 IoSession 运行解码方法,MINA 需要做某种形式的同步 => 这一同步也可以保证你不会遇到上面描述的可见性问题。(感谢 Adam Fisk 指出这一点) 参考 http://www.nabble.com/Tutorial-on-ProtocolCodecFilter,-state-and-threads-t3965413.html
- IoBuffer.prefixedDataAvailable() 在你的协议事宜一个长度前缀时很是便利;它支持 1、2 或 4 个字节的前缀
- 在你解码一个响应的时候别忘了复位解码状态 (将该会话属性删除是解决这种问题的另一种方法)
如果响应只有单一的一个图片,我们就无需保存解码状态了:
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.prefixedDataAvailable(4)) {
int length = in.getInt();
byte[] bytes = new byte[length];
in.get(bytes);
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
BufferedImage image = ImageIO.read(bais);
out.write(image);
return true;
} else {
return false;
}
}
现在我们把它们都组合在一起:
public class ImageCodecFactory implements ProtocolCodecFactory {
private ProtocolEncoder encoder;
private ProtocolDecoder decoder;
public ImageCodecFactory(boolean client) {
if (client) {
encoder = new ImageRequestEncoder();
decoder = new ImageResponseDecoder();
} else {
encoder = new ImageResponseEncoder();
decoder = new ImageRequestDecoder();
}
}
public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
return decoder;
}
}
备注:
- 对于每个新会话,MINA 将会请求 ImageCodecFactory 以获取一个编码器或者一个解码器
- 因为我们的编码器和解码器没有保存会话状态,因此让所有会话共享一个单一实例是安全的
服务端对ProtocolCodecFactory的使用
这里是服务器端对 ProtocolCodecFactory 的使用:
public class ImageServer {
public static final int PORT = 33789;
public static void main(String[] args) throws IOException {
ImageServerIoHandler handler = new ImageServerIoHandler();
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("protocol", new ProtocolCodecFilter(new ImageCodecFactory(false)));
acceptor.setLocalAddress(new InetSocketAddress(PORT));
acceptor.setHandler(handler);
acceptor.bind();
System.out.println("server is listenig at port " + PORT);
}
}
客户端的使用完全一致:
public class ImageClient extends IoHandlerAdapter {
public static final int CONNECT_TIMEOUT = 3000;
private String host;
private int port;
private SocketConnector connector;
private IoSession session;
private ImageListener imageListener;
public ImageClient(String host, int port, ImageListener imageListener) {
this.host = host;
this.port = port;
this.imageListener = imageListener;
connector = new NioSocketConnector();
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ImageCodecFactory(true)));
connector.setHandler(this);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageResponse response = (ImageResponse) message;
imageListener.onImages(response.getImage1(), response.getImage2());
}
...
完整性考虑,现在附加一个服务器端的 IoHandler 代码
public class ImageServerIoHandler extends IoHandlerAdapter {
private final static String characters = "mina rocks abcdefghijklmnopqrstuvwxyz0123456789";
public static final String INDEX_KEY = ImageServerIoHandler.class.getName() + ".INDEX";
private Logger logger = LoggerFactory.getLogger(this.getClass());
public void sessionOpened(IoSession session) throws Exception {
session.setAttribute(INDEX_KEY, 0);
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
IoSessionLogger sessionLogger = IoSessionLogger.getLogger(session, logger);
sessionLogger.warn(cause.getMessage(), cause);
}
public void messageReceived(IoSession session, Object message) throws Exception {
ImageRequest request = (ImageRequest) message;
String text1 = generateString(session, request.getNumberOfCharacters());
String text2 = generateString(session, request.getNumberOfCharacters());
BufferedImage image1 = createImage(request, text1);
BufferedImage image2 = createImage(request, text2);
ImageResponse response = new ImageResponse(image1, image2);
session.write(response);
}
private BufferedImage createImage(ImageRequest request, String text) {
BufferedImage image = new BufferedImage(request.getWidth(), request.getHeight(), BufferedImage.TYPE_BYTE_INDEXED);
Graphics graphics = image.createGraphics();
graphics.setColor(Color.YELLOW);
graphics.fillRect(0, 0, image.getWidth(), image.getHeight());
Font serif = new Font("serif", Font.PLAIN, 30);
graphics.setFont(serif);
graphics.setColor(Color.BLUE);
graphics.drawString(text, 10, 50);
return image;
}
private String generateString(IoSession session, int length) {
Integer index = (Integer) session.getAttribute(INDEX_KEY);
StringBuffer buffer = new StringBuffer(length);
while (buffer.length() < length) {
buffer.append(characters.charAt(index));
index++;
if (index >= characters.length()) {
index = 0;
}
}
session.setAttribute(INDEX_KEY, index);
return buffer.toString();
}
}
结论
关于编码和解码不仅于此。但是我希望本文足以让你开始了。不久的将来我会尝试着添加关于
DemuxingProtocolCodecFactory 的一些介绍。届时我们也将看一下如何使用定界符取代长度前缀的做法
译者注:翻译版本的项目源码见
https://github.com/waylau/apache-mina-2-user-guide-demos 中的