Netty实战-起步

概述

Netty是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。为什么它就能做很多中间件的底层网络框架,为什么快,异步指什么,事件驱动又是什么?这是看Netty介绍先有的疑问。

NIO

传统socket编程,一般就几步:

  • 监听端口,建立Socket连接
  • 建立线程,处理业务逻辑
  • 关闭连接

下方代码为简单的socket服务器(Echo Server):

public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        Socket socket = serverSocket.accept();
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        String request;
        while ((request = in.readLine()) != null) {
            System.out.println(request);
            out.println("Hello client");
        }
        in.close();
        out.close();
        socket.close();
    }

这个程序只能处理一个连接,如果支持多个客户端,需要起不同的线程来处理连接,线程的开销就成了瓶颈。JDK1.4之后,可以使用NIO实现(非阻塞型I/O )。

public class PlainNioServer {

    public static void main(String[] args) throws IOException {
        new PlainNioServer().serve(8080);
    }

    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (; ; ) {
            selector.select();
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel client = server.accept();
                    client.configureBlocking(false);
                    client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                    System.out.println("Accepted connection from " + client);
                }
                if (key.isWritable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    while (buffer.hasRemaining()) {
                        if (client.write(buffer) == 0) {
                            break;
                        }
                    }
                    client.close();
                }
            }
        }
    }
}

代码是这个熊样,处理复杂,业务逻辑和网络代码混杂。

Netty基本概念

异步事件驱动

  • 不必等待一个操作的完成。
  • 选择器能够通过较少的线程便可监视许多连接上的事件。

Channel 它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。

回调 简单说时候就是调用者调用执行者方法后,执行者处理完后又调用调用者的方法。

Future jdk已经一共了Future接口,表示对获取一个异步执行的状态或者返回值。但要获取这些状态,必须阻塞或者轮询查询,非常麻烦。Netty提供了异步回调的ChannelFuture。我们能够注册一个或者多个ChannelFutureListener实例,监听器的回调方法operationComplete(),将会在对应的操作完成时被通知,真正做到异步,非阻塞。

每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture,也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty完全是异步和事件驱动的。事实上,回调和Future是相互补充的机制;它们相互结合,构成了Netty 本身的关键构件块之一。

如果在ChannelFutureListener添加到ChannelFuture的时候,ChannelFuture已经完成, 那么该 ChannelFutureListener 将会被直接地通知

ChannelHandler Netty使用不同的事件来通知我们状态的改变或者是操作的状态,每个事件都可以被分发给 ChannelHandler 类中的某个实现方法处理。

在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成。

图片来自Netty那点事(一)概述

Byteuf,Reactor,Pipeline 先不解释。

Echo Server

有了上面俩个示例,再看netty实现的简单Echo Server。

所有的 Netty 服务器都需要以下两部分:

  1. 至少一个ChannelHandler,该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
  2. 引导,这是配置服务器的启动代码,如端口监听。

ChannelHandler

@ChannelHandler.Sharable//标示一个 Channel- Handler 可以被多 个 Channel 安全地 共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf bytebuf = (ByteBuf) msg;
        System.out.println("Server received:"+ bytebuf.toString(CharsetUtil.UTF_8));
        ctx.write(bytebuf);// 收到的消息写给发送者
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 将未决消息冲刷到远程节点,并且关闭该Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();// 打印异常
        ctx.close();// 关闭连接
    }
}
  • ChannelInboundHandler: 定义了ChannelInboundHandler的默认实现,处理入站消息,并且它的每个方法都可以被重写以挂钩到事件生命周期的恰当点上。
  • 针对不同事件调用不同ChannelHandler,handler的方法又可以挂到时间的对应声明周期,业务逻辑在handler处理,和其他网络配置分开。
  • 如果不捕获异常,异常将在ChannelPipeline中传递。

引导服务器

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer(port).start();
    }

    private void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)     // 指定所使用的 NIO 传输 Channel
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO)) // log level
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(serverHandler); // 添加channelHandler
                 }
             });
            ChannelFuture future = b.bind(port).sync();   // 异步绑定,但阻塞等待
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客户端代码省略,详见官网。


参考列表:

CONTENTS