MyException - 我的异常网
当前位置:我的异常网» 开源软件 » netty学习【2】:netty中handler的执行顺序

netty学习【2】:netty中handler的执行顺序

www.MyException.Cn  网友分享于:2013-12-09  浏览:0次
netty学习【二】:netty中handler的执行顺序

1、简介

Handler在netty中,无疑占据着非常重要的地位。Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一对日志错误进行处理、统一对请求进行计数、控制Handler执行与否。一句话,没有它做不到的只有你想不到的。

Netty中的所有handler都实现自ChannelHandler接口。按照输出输出来分,分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:
这里写图片描述

2、demo案例

这里写图片描述
如上即为本测试demo的例子:

2.1、服务器端

EchoServer.java

 

package send_order.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求连接或接收数据时该做什么
 * 
 *
 *
 */
public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup eventLoopGroup = null;
        try {
            //server端引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //连接池处理数据
            eventLoopGroup = new NioEventLoopGroup();
            serverBootstrap.group(eventLoopGroup)
            .channel(NioServerSocketChannel.class)//指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel
            .localAddress("localhost",port)//设置InetSocketAddress让服务器监听某个端口已等待客户端连接。
            .childHandler(new ChannelInitializer<Channel>() {//设置childHandler执行所有的连接请求
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    // 注册两个InboundHandler,执行顺序为注册顺序,所以应该是InboundHandler1 InboundHandler2
                    // 注册两个OutboundHandler,执行顺序为注册顺序的逆序,所以应该是OutboundHandler2 OutboundHandler1
                    ch.pipeline().addLast(new EchoInHandler1());
                    ch.pipeline().addLast(new EchoOutHandler1());
                    ch.pipeline().addLast(new EchoOutHandler2());
                    ch.pipeline().addLast(new EchoInHandler2());

                }
                    });
            // 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            System.out.println("开始监听,端口为:" + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(20000).start();
    }
}

 EchoInHandler1.java

 

 

package send_order.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;



public class EchoInHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in1");
         // 通知执行下一个InboundHandler
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新后才将数据发出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

 EchoInHandler2.java

 

 

package send_order.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;


public class EchoInHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in2");
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("接收客户端数据:" + body);
        //向客户端写数据
        System.out.println("server向client发送数据");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新后才将数据发出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

 EchoOuteHandler1.java

 

 

package send_order.server;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    // 向client发送消息
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("out1");
        /*System.out.println(msg);*/

        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
        ctx.flush();
       }
}

 EchoOuteHandler2.java

 

 

package send_order.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {

     @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("out2");
            // 执行下一个OutboundHandler
            /*System.out.println("at first..msg = "+msg);
            msg = "hi newed in out2";*/
            super.write(ctx, msg, promise);
        }

}

 

 

2.2、客户端

EchoClient.java

 

package send_order.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

/**
 * • 连接服务器 • 写数据到服务器 • 等待接受服务器返回相同的数据 • 关闭连接
 * 
 *
 *
 */
public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup nioEventLoopGroup = null;
        try {
            // 客户端引导类
            Bootstrap bootstrap = new Bootstrap();
            // EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据
            nioEventLoopGroup = new NioEventLoopGroup();
            bootstrap.group(nioEventLoopGroup)//多线程处理
                    .channel(NioSocketChannel.class)//指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel
                    .remoteAddress(new InetSocketAddress(host, port))//地址
                    .handler(new ChannelInitializer<SocketChannel>() {//业务处理类
                                @Override
                                protected void initChannel(SocketChannel ch)
                                        throws Exception {
                                    ch.pipeline().addLast(new EchoClientHandler());//注册handler
                                }
                            });
            // 链接服务器
            ChannelFuture channelFuture = bootstrap.connect().sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            nioEventLoopGroup.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("localhost", 20000).start();
    }
}

 EchoClientHandler.java

 

 

package send_order.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    // 客户端连接服务器后被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接服务器,开始发送数据……");
        byte[] req = "QUERY TIME ORDER".getBytes();//消息
        ByteBuf firstMessage = Unpooled.buffer(req.length);//发送类
        firstMessage.writeBytes(req);//发送
        ctx.writeAndFlush(firstMessage);//flush
    }

    // • 从服务器接收到数据后调用
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
            throws Exception {
        System.out.println("client 读取server数据..");
        // 服务端返回消息后
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("服务端数据为 :" + body);
    }

    // • 发生异常时被调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.out.println("client exceptionCaught..");
        // 释放资源
        ctx.close();
    }
}

 

 

3、结果展示

服务器端:

 

开始监听,端口为:/127.0.0.1:20000
in1
in2
接收客户端数据:QUERY TIME ORDER
server向client发送数据
out2
out1

 但是大家主要注意到我把源码中的:

 

 

ch.pipeline().addLast(new EchoInHandler1());
ch.pipeline().addLast(new EchoOutHandler1());
ch.pipeline().addLast(new EchoOutHandler2());
ch.pipeline().addLast(new EchoInHandler2());

 改为:

 

 

ch.pipeline().addLast(new EchoInHandler1());
ch.pipeline().addLast(new EchoInHandler2());
ch.pipeline().addLast(new EchoOutHandler1());
ch.pipeline().addLast(new EchoOutHandler2());

 大家可以观察到下面的结果:outhandler没有起作用
服务器端

 

开始监听,端口为:/127.0.0.1:20000
in1
in2
接收客户端数据:QUERY TIME ORDER
server向client发送数据

 

4、总结

在使用Handler的过程中,需要注意:
1、ChannelInboundHandler之间的传递,通过调用 ctx.fireChannelRead(msg) 实现;调用ctx.write(msg) 将传递到ChannelOutboundHandler。
2、ctx.write()方法执行后,需要调用flush()方法才能令它立即执行。
3、流水线pipeline中outhandler不能放在最后,否则不生效
4、Handler的消费处理放在最后一个处理。

文章评论

Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
每天工作4小时的程序员
每天工作4小时的程序员
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
老程序员的下场
老程序员的下场
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
写给自己也写给你 自己到底该何去何从
写给自己也写给你 自己到底该何去何从
中美印日四国程序员比较
中美印日四国程序员比较
程序员的鄙视链
程序员的鄙视链
程序员和编码员之间的区别
程序员和编码员之间的区别
我的丈夫是个程序员
我的丈夫是个程序员
程序员应该关注的一些事儿
程序员应该关注的一些事儿
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
编程语言是女人
编程语言是女人
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
漫画:程序员的工作
漫画:程序员的工作
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
我是如何打败拖延症的
我是如何打败拖延症的
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
Java程序员必看电影
Java程序员必看电影
如何成为一名黑客
如何成为一名黑客
鲜为人知的编程真相
鲜为人知的编程真相
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
 程序员的样子
程序员的样子
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
代码女神横空出世
代码女神横空出世
总结2014中国互联网十大段子
总结2014中国互联网十大段子
那些争议最大的编程观点
那些争议最大的编程观点
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
为什么程序员都是夜猫子
为什么程序员都是夜猫子
10个调试和排错的小建议
10个调试和排错的小建议
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有