概述
Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。为什么它就能做很多中间件的底层网络框架,为什么快,异步指什么,事件驱动又是什么?这是看Netty介绍先有的疑问。
NIO
传统socket编程,一般就几步:
- 监听端口,建立Socket连接
- 建立线程,处理业务逻辑
- 关闭连接
下方代码为简单的socket服务器(Echo Server):
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = serverSocket.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
String request;
while ((request = in.readLine()) != null) {
System.out.println(request);
out.println("Hello client");
}
in.close();
out.close();
socket.close();
}
这个程序只能处理一个连接,如果支持多个客户端,需要起不同的线程来处理连接,线程的开销就成了瓶颈。JDK1.4之后,可以使用NIO实现(非阻塞型I/O )。
public class PlainNioServer {
public static void main(String[] args) throws IOException {
new PlainNioServer().serve(8080);
}
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (; ; ) {
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
System.out.println("Accepted connection from " + client);
}
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
}
}
}
}
代码是这个熊样,处理复杂,业务逻辑和网络代码混杂。
Netty基本概念
异步事件驱动
- 不必等待一个操作的完成。
- 选择器能够通过较少的线程便可监视许多连接上的事件。
Channel 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。
回调 简单说时候就是调用者调用执行者方法后,执行者处理完后又调用调用者的方法。
Future jdk已经一共了Future接口,表示对获取一个异步执行的状态或者返回值。但要获取这些状态,必须阻塞或者轮询查询,非常麻烦。Netty提供了异步回调的ChannelFuture。我们能够注册一个或者多个ChannelFutureListener实例,监听器的回调方法operationComplete(),将会在对应的操作完成时被通知,真正做到异步,非阻塞。
每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture,也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty完全是异步和事件驱动的。事实上,回调和Future是相互补充的机制;它们相互结合,构成了Netty 本身的关键构件块之一。
如果在ChannelFutureListener添加到ChannelFuture的时候,ChannelFuture已经完成, 那么该 ChannelFutureListener 将会被直接地通知
ChannelHandler Netty使用不同的事件来通知我们状态的改变或者是操作的状态,每个事件都可以被分发给 ChannelHandler 类中的某个实现方法处理。
在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成。
图片来自Netty那点事(一)概述
Byteuf,Reactor,Pipeline 先不解释。
Echo Server
有了上面俩个示例,再看netty实现的简单Echo Server。
所有的 Netty 服务器都需要以下两部分:
- 至少一个ChannelHandler,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
- 引导,这是配置服务器的启动代码,如端口监听。
ChannelHandler
@ChannelHandler.Sharable//标示一个 Channel- Handler 可以被多 个 Channel 安全地 共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf bytebuf = (ByteBuf) msg;
System.out.println("Server received:"+ bytebuf.toString(CharsetUtil.UTF_8));
ctx.write(bytebuf);// 收到的消息写给发送者
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将未决消息冲刷到远程节点,并且关闭该Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();// 打印异常
ctx.close();// 关闭连接
}
}
- ChannelInboundHandler: 定义了
ChannelInboundHandler
的默认实现,处理入站消息,并且它的每个方法都可以被重写以挂钩到事件生命周期的恰当点上。 - 针对不同事件调用不同ChannelHandler,handler的方法又可以挂到时间的对应声明周期,业务逻辑在handler处理,和其他网络配置分开。
- 如果不捕获异常,异常将在ChannelPipeline中传递。
引导服务器
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoServer(port).start();
}
private void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定所使用的 NIO 传输 Channel
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)) // log level
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler); // 添加channelHandler
}
});
ChannelFuture future = b.bind(port).sync(); // 异步绑定,但阻塞等待
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端代码省略,详见官网。
参考列表: