MyException - 我的异常网
当前位置:我的异常网» 开源软件 » netty 抽象nio讯息通道

netty 抽象nio讯息通道

www.MyException.Cn  网友分享于:2013-10-08  浏览:0次
netty 抽象nio消息通道
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
引言
上一篇文章我们看了抽象nio字节通道,先来回顾一下:
写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。
抽象nio字节通道是面向字节的通道,为Socket通道的父类,
今天我们来看ServerSocket通道的父类AbstractNioMessageChannel,面向消息的通道:
package io.netty.channel.nio;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;

import java.io.IOException;
import java.net.PortUnreachableException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages.
 */
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    boolean inputShutdown;//是否关闭输入流

    /**
     * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
     */
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
}

来看实际写操作:
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    final SelectionKey key = selectionKey();
    final int interestOps = key.interestOps();

    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            if ((interestOps & SelectionKey.OP_WRITE) != 0) {
	       //消息已发送完,从选择key兴趣集中移除写操作事件
                key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
            }
            break;
        }
        try {
            boolean done = false;
            for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
	        //写消息
                if (doWriteMessage(msg, in)) {
                    done = true;
                    break;
                }
            }

            if (done) {//如果读完消息,则从通道刷新链上移除写请求
                in.remove();
            } else {
                // Did not write all messages.
                if ((interestOps & SelectionKey.OP_WRITE) == 0) {
		    //消息没发送完,如果需要添加写事件到选择key的兴趣事件集
                    key.interestOps(interestOps | SelectionKey.OP_WRITE);
                }
                break;
            }
        } catch (IOException e) {
            if (continueOnWriteError()) {
	        //如果写异常时需要移除写请求,则移除
                in.remove(e);
            } else {
                throw e;
            }
        }
    }
}

/**
 * Write a message to the underlying {@link java.nio.channels.Channel}.
 *写一个消息到底层通道,待子类扩展
 * @return {@code true} if and only if the message has been written
 */
protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
/**

 * Returns {@code true} if we should continue the write loop on a write error.
 */
protected boolean continueOnWriteError() {
    return false;
}

从上面可以看出 抽象Nio消息通道,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

再来看其他方法:
//开始读操作
@Override
protected void doBeginRead() throws Exception {
    if (inputShutdown) {
        return;
    }
    super.doBeginRead();
}
//创建与底层通道交流的Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}


从上面的方法可以看出,实际返回的为nio消息Unsafe,我们来看NioMessageUnsafe,
private final class NioMessageUnsafe extends AbstractNioUnsafe {
    private final List<Object> readBuf = new ArrayList<Object>();
    @Override
    public void read() {
        assert eventLoop().inEventLoop();
	//获取通道配置,Channel管道,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);
        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
		    //从通道缓冲区读取数据
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
		       //没有数据可读取
                        break;
                    }
                    if (localRead < 0) {
		       //通道已关闭
                        closed = true;
                        break;
                    }
                    //更新读取消息计数器
                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }
            
            int size = readBuf.size();
	    //遍历读取的消息集,通知通道处理消息,即触发管道fireChannelRead事件
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
	    //读取完毕,触发管道fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
	        //根据异常判断是否需要,关闭读任务
                closed = closeOnReadError(exception);
                //触发通道fireExceptionCaught事件
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
	        //关闭读任务
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件
                removeReadOp();
            }
        }
    }
}

//AbstractNioMessageChannel

/**
 * Read messages into the given array and return the amount which was read.
 从通道缓冲区读取消息,方法指定的buf集合中,并返回读取的消息数量,待子类扩展
 */
protected abstract int doReadMessages(List<Object> buf) throws Exception;


//判断异常发生时,是否需要关闭读任务
protected boolean closeOnReadError(Throwable cause) {
    // ServerChannel should not be closed even on IOException because it can often continue
    // accepting incoming connections. (e.g. too many open files)
    return cause instanceof IOException &&
            !(cause instanceof PortUnreachableException) &&
            !(this instanceof ServerChannel);
}

从上面可以看出:nio消息Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,
则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,
则从选择key兴趣事件集移除读操作事件



总结:
抽象Nio消息通道AbstractNioMessageChannel,写通道Outbound缓冲区消息,即遍历通道Outbound缓冲区刷新链,当写消息请求为空时,从选择key兴趣集中移除写操作事件,否则,委托doWriteMessage方法,将消息写到底层通道,doWriteMessage方法待子类扩展,写完,将写请求从刷新链上移除,否则,如果需要,添加写事件到选择key的兴趣事件集。

nio消息Unsafe(NioMessageUnsafe)读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则触发通道fireExceptionCaught事件,如果读任务完毕,且不需自动读,则从选择key兴趣事件集移除读操作事件

文章评论

编程语言是女人
编程语言是女人
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
Java程序员必看电影
Java程序员必看电影
每天工作4小时的程序员
每天工作4小时的程序员
2013年中国软件开发者薪资调查报告
2013年中国软件开发者薪资调查报告
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
中美印日四国程序员比较
中美印日四国程序员比较
我的丈夫是个程序员
我的丈夫是个程序员
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
鲜为人知的编程真相
鲜为人知的编程真相
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
老程序员的下场
老程序员的下场
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
程序员和编码员之间的区别
程序员和编码员之间的区别
我是如何打败拖延症的
我是如何打败拖延症的
代码女神横空出世
代码女神横空出世
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
程序员的鄙视链
程序员的鄙视链
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
总结2014中国互联网十大段子
总结2014中国互联网十大段子
漫画:程序员的工作
漫画:程序员的工作
 程序员的样子
程序员的样子
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
程序员应该关注的一些事儿
程序员应该关注的一些事儿
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
那些争议最大的编程观点
那些争议最大的编程观点
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
旅行,写作,编程
旅行,写作,编程
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
如何成为一名黑客
如何成为一名黑客
程序员必看的十大电影
程序员必看的十大电影
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有