03、Netty 教程 – 初窥Netty编程

作者:唐亚峰 | 出自:唐亚峰博客

在上一篇文章中介绍了NIO类库简介,从本章开始都围绕Netty展开讨论和概述……

什么是Netty

Netty是业界有名且最流行的NIO框架之一,健壮,稳定,高性能,可定制,可扩展在同类框架都是首屈一指,而且成功的运用在各大商业项目中,比如HadoopRPC框架avro,当当接盘的DubboX都在使用…

Netty 的优点

  • API使用简单,开发门槛低
  • 功能强大,多种解码与编码器
  • 支持多种主流的通讯协议
  • 定制能力强大,可以通过ChannelHandler对通讯框架灵活的扩展
  • 相比业界主流NIO框架,Netty综合评价更高
  • 成熟稳定,社区活跃

Netty 缺点

  • 5.x 模型存在问题,已被废弃

编译

GIT:https://github.com/netty/netty

如果需要编译Netty,需要最低JDK1.7,在运行时Netty3.x只需要JDK1.5,同时博客参考 李林峰 大神的 《Netty权威指南第二版》

因为主要是学习Netty,而不是实战,同时为了更好的适配即将推出的Netty6,用Netty5的API也许会更好点,就当为Netty6做技术储备吧…

如果使用Maven,在项目中需要添加

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>5.0.0.Alpha2</version>
    </dependency>
</dependencies>

Hello Netty

继续用TimeServerTimeClient 为例,改造代码使用Netty实现服务端与客户端的通讯,以及上一章博客遗留的数据丢失问题,在使用Netty后都是不在的…

TimeServer

public static void bind(int port) {
    EventLoopGroup masterGroup = new NioEventLoopGroup();//创建线程组
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();//创建NIO服务端启动辅助类
        bootstrap.group(masterGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)//连接数
                    .option(ChannelOption.TCP_NODELAY, true)//不延迟,立即推送
                    .childHandler(new ChildChannelHandler());
        //绑定端口,同步等待成功,
        System.out.println("绑定端口,同步等待成功......");
        ChannelFuture future = bootstrap.bind(port).sync();
        //等待服务端监听端口关闭
        future.channel().closeFuture().sync();
        System.out.println("等待服务端监听端口关闭......");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //优雅退出释放线程池
        masterGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        System.out.println("优雅退出释放线程池......");
    }
}

1、 实例化个含NIO线程的线程组,专门用来处理网络事件,管理线程;
2、 使用ServerBootstrap类来初始化Netty服务器,并且开始监听端口的socket请求;
3、 根据ServerBootstrap內封装好的方法设置服务器基础信息;
4、 设置服务端的连接数ChannelOption.SO_BACKLOG1024
5、 设置服务端消息不延迟,立即推送ChannelOption.TCP_NODELAY
6、 配置好服务器,在服务器启动时绑定闯入的port端口,等待同步;
7、 优雅退出,释放资源;

数据操作类ServerHandler:在该类中对客户端发来的数据进行操作,发送给客户端数据

private static class ChildChannelHandler extends ChannelInitializer {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        //创建Channel通道
        channel.pipeline().addLast(new TimeServerHandler());//往通道中添加i/o事件处理类
    }
    private static class TimeServerHandler extends ChannelHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //channelRead方法中的msg参数(服务器接收到的客户端发送的消息)强制转换成ByteBuf类型
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            String body = new String(req, "UTF-8");//返回String,指定编码(有可能出现不支持的编码类型异常,所以要trycatch
            System.out.println("TimeServer 接收到的消息 :" + body);
            ByteBuf resp = Unpooled.copiedBuffer("你在说什么呢...".getBytes());
            ctx.write(resp);// 将消息放入缓冲数组
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //将消息队列中信息写入到SocketChannel中去,解决了频繁唤醒Selector所带来不必要的性能开销
            //Netty的 write 只是将消息放入缓冲数组,再通过调用 flush 才会把缓冲区的数据写入到 SocketChannel
            ctx.flush();
        }
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();//发生异常时候,执行重写后的 exceptionCaught 进行资源关闭
        }
    }
public static void main(String[] args) {
    TimeServer.bind(4040);
}

ChildChannelHandler可以看到,只需重写对应的方法,即可简单的实现一个Netty通讯的服务端,这里的ByteBuf可以看成是NettyByteBuffer的增强实现…

接下来编写TimeClient程序

TimeClient

public static void connect(String host, int port) {
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = null;
    try {
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)//不延迟,消息立即推送
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new TimeClientHandler());
                    }
                });
        //发起异步请求
        future = bootstrap.connect(host, port).sync();
        //等待客户端链路关闭
        future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}

一个很简单的connect连接操作,在服务端我们使用的是ServerBootstrap 客户端使用 Bootstrap就可以了,是不是很像NIO编程中的ServerSocketChannelSocketChannel,这里与服务端不同的是它的Channel需要设置为NioSocketChannel,然后为其添加一个handler…..

private static class TimeClientHandler extends ChannelHandlerAdapter {
    private final ByteBuf firstMessage;
    public TimeClientHandler() {
        byte[] req = "QUERY TIME ORDER ".getBytes();
        firstMessage = Unpooled.buffer(req.length);
        firstMessage.writeBytes(req);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(firstMessage);//推送消息
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("TimeClient 接收到的消息 :" + body);
        //ctx.close();//接受完消息关闭连接,注释掉可以看到释放资源,否则请求完后就关闭连接是看不到异常情况的
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("释放资源:" + cause.getMessage());//不重写将会看到堆栈信息以及资源无法关闭
        ctx.close();
    }
public static void main(String[] args) {
    TimeClient.connect("127.0.0.1", 4040);
}

1、 首先跟服务器操作类一样继承ChannelHandlerAdapter类,重写channelReadchannelActiveexceptionCaught三个方法;
2、 其中channelActive方法是用来发送客户端信息的,channelRead方法客户端是接收服务器数据的;
3、 先声明一个全局变量firstMessage,用来接收客户端发出去的信息的值;
4、channelActive方法中,把要传输的数据转化为字节数组;
5、channelRead方法中,通过ByteBuf接收服务端回复的内容,然后关闭当前连接(当然具体是否关闭取决于业务了,如果请求完就关闭连接是看不到释放资源的日志);
6、exceptionCaught方法中,可以处理一些异常情况,比如服务器关闭,该方法会监听到…;

测试一下

启动TimeServer

绑定端口,同步等待成功......
TimeServer 接收到的消息 :QUERY TIME ORDER

启动TimeClient 然后关闭 TimeServer

TimeClient 接收到的消息 :你在说什么呢...
释放资源:远程主机强迫关闭了一个现有的连接。

– 说点什么

全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter3-1/battcn-netty-1

附录:Netty 教程系列文章