MyException - 我的异常网
当前位置:我的异常网» 开源软件 » netty 抽象nio字节通路

netty 抽象nio字节通路

www.MyException.Cn  网友分享于:2013-10-19  浏览:0次
netty 抽象nio字节通道
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
引言
前一篇文章,我们看了抽象nio通道,先来回顾一下:
抽象nio通道AbstractNioChannel内部关联一个可选择通道(SelectableChannel)和一个选择key(selectionKey)。抽象Nio通道构造,主要是初始化通道并配置为非阻塞模式。

注册doRegister工作主要是,注册可选择通道到通道所在事件循环的选择器中。反注册doDeregister,委托给事件循环,取消选择key,即从事件循环关联选择器的选择key集合中移除当前选择key。开始读操作doBeginRead,实际工作为将读操作事件,添加选择key的兴趣事件集

抽象nioUnsafe为特殊的Unsafe,允许访问底层的选择通道。选择通道方法返回的实际为抽象nio通道内部的底层可选择通道。移除读兴趣事件removeReadOp,即从选择key兴趣事件集中,移除读操作事件。连接操作,将实际连接操作委托给doConnect,待子类实现,如果连接成功,则通知异步任务连接成功,如果是第一次连接,则触发通道的激活事件fireChannelActive。完成连接操作,实际工作委托给抽象Nio通道的doFinishConnect方法,待子类实现,完成后更新任务结果,触发通道的激活事件fireChannelActive,如果出现异常,则更新连接任务为异常失败。

今天我们来看一下抽象nio字节通道AbstractNioByteChannel
package io.netty.channel.nio;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FileRegion;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.util.internal.StringUtil;

import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;

/**
 * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
 */
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//通道元数据
    private static final String EXPECTED_TYPES =
            " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
            StringUtil.simpleClassName(FileRegion.class) + ')';

    private Runnable flushTask;//刷新任务
     /**
     * Create a new instance
     *创建nio字节通道
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     */
    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        //读操作事件为SelectionKey.OP_READ
        super(parent, ch, SelectionKey.OP_READ);
    }
}

来看实际写操作:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;//写操作自旋此数计数器

    boolean setOpWrite = false;
    for (;;) {
        //获取当前通道Outbound缓冲区中的当前待刷新消息
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
	    //清除通道选择key兴趣事件集中写操作事件
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        if (msg instanceof ByteBuf) {//如果为字节buf
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
	        //可读字节数为0,则从通道Outbound缓存区中移除写请求
                in.remove();
                continue;
            }

            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
	        //获取通道配置的写请求自旋次数
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
	        //实际发送字节数据
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
		    //没有字节要发送
                    setOpWrite = true;
                    break;
                }
                //更新刷新字节计数器
                flushedAmount += localFlushedAmount;
                if (!buf.isReadable()) {
		    //如果buf不可读,则数据发送完毕
                    done = true;
                    break;
                }
            }
            //报告字节数据发送进度
            in.progress(flushedAmount);

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else if (msg instanceof FileRegion) {
	    //如果是文件Region消息
            FileRegion region = (FileRegion) msg;
            boolean done = region.transferred() >= region.count();
            if (!done) {
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }
                for (int i = writeSpinCount - 1; i >= 0; i--) {
		   //完成实际写操作
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }
                    //更新刷新字节计数器
                    flushedAmount += localFlushedAmount;
                    if (region.transferred() >= region.count()) {
                        done = true;
                        break;
                    }
                }
                //更新数据发送进度
                in.progress(flushedAmount);
            }

            if (done) {
	        //发送完,则移除写请求
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
    }
    //完成写后继任务
    incompleteWrite(setOpWrite);
}

/**
 * Write a {@link FileRegion}
 *写一个文件region,待子类扩展
 * @param region        the {@link FileRegion} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract long doWriteFileRegion(FileRegion region) throws Exception;

/**
 * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
 写一个字节buf,待子类扩展
 * @param buf           the {@link ByteBuf} from which the bytes should be written
 * @return amount       the amount of written bytes
 */
protected abstract int doWriteBytes(ByteBuf buf) throws Exception;


//完成写后继任务
protected final void incompleteWrite(boolean setOpWrite) {
    // Did not write completely.
    if (setOpWrite) {
        //发送数据完成,则重新添加写操作事件到选择key兴趣事件集
        setOpWrite();
    } else {
        //否则,继续刷新通道Outbound缓冲区中的写请求
        // Schedule flush again later so other tasks can be picked up in the meantime
        Runnable flushTask = this.flushTask;
        if (flushTask == null) {
            flushTask = this.flushTask = new Runnable() {
                @Override
                public void run() {
                    flush();
                }
            };
        }
        eventLoop().execute(flushTask);
    }
}
//添加写操作事件到选择key兴趣事件集
protected final void setOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
        key.interestOps(interestOps | SelectionKey.OP_WRITE);
    }
}
//清除通道选择key兴趣事件集中写操作事件
protected final void clearOpWrite() {
    final SelectionKey key = selectionKey();
    // Check first if the key is still valid as it may be canceled as part of the deregistration
    // from the EventLoop
    // See https://github.com/netty/netty/issues/2104
    if (!key.isValid()) {
        return;
    }
    final int interestOps = key.interestOps();
    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
    }
}

从上面可以看出,写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为
字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

//过滤消息
@Override
protected final Object filterOutboundMessage(Object msg) {
    if (msg instanceof ByteBuf) {
        ByteBuf buf = (ByteBuf) msg;
	//如果消息为direct buf,直接返回消息
        if (buf.isDirect()) {
            return msg;
        }
        //否则包装buf为direct buf,这个newDirectBuffer方法在上一篇文章中以说
        return newDirectBuffer(buf);
    }

    if (msg instanceof FileRegion) {
        //如果是文件Region直接返回
        return msg;
    }

    throw new UnsupportedOperationException(
            "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}


再看其他方法:
//获取通道元数据
@Override
public ChannelMetadata metadata() {
    return METADATA;
}
/**
 * Shutdown the input side of the channel.
 关闭通道输入流
 */
protected abstract ChannelFuture shutdownInput();
//通道是否关闭输入流
protected boolean isInputShutdown0() {
    return false;
}
//创建底层通道操作类Unsafe
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioByteUnsafe();
}

从newUnsafe方法,返回的为NioByteUnsafe
我们来看nio字节Unsafe:
protected class NioByteUnsafe extends AbstractNioUnsafe {
    //读操作
    @Override
    public final void read() {
        //获取通道配置,Channel管道,字节buf分配器,接受字节buf分配器Handle
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
	        //分配一个字节buf
                byteBuf = allocHandle.allocate(allocator);
		//读取通道接收缓冲区数据
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
		    //没有数据可读,则释放buf
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                //更新读取消息计数器
                allocHandle.incMessagesRead(1);
                readPending = false;
		//通知通道处理读取数据,触发Channel管道线的fireChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());
            //读取操作完毕
            allocHandle.readComplete();
	    //触发Channel管道线的fireChannelReadComplete事件
            pipeline.fireChannelReadComplete();

            if (close) {
	        //如果通道关闭,关闭读操作
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
	    //处理读操作异常
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
	        //读操作完毕,且没有配置自动读,则从选择key兴趣集中移除读操作事件
                removeReadOp();
            }
        }
    }
     //关闭读操作
    private void closeOnRead(ChannelPipeline pipeline) {
        if (!isInputShutdown0()) {
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
	        //关闭通道输入流
                shutdownInput();
		//触发通道输入关闭事件
                pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
            } else {
                close(voidPromise());
            }
        } else {
	    //触发通道输入关闭没有数据可读取事件
            pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
        }
    }
    //处理读操作异常
     private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
            RecvByteBufAllocator.Handle allocHandle) {
        if (byteBuf != null) {
            if (byteBuf.isReadable()) {
                readPending = false;
		//字节buf中还有数据没处理,则继续处理
                pipeline.fireChannelRead(byteBuf);
            } else {
                byteBuf.release();
            }
        }
	//读取操作完毕,触发Channel管道线的fireChannelReadComplete事件及fireExceptionCaught事件
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        pipeline.fireExceptionCaught(cause);
        if (close || cause instanceof IOException) { 
	   //如果需要,关闭读操作
            closeOnRead(pipeline);
        }
    } 
}

从上面可以看出,nio字节Unsafe读操作,从通道接收缓冲区读取数据,
通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,
触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则
触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则
读取缓存区中没有读完的数据,并通道通道处理剩余数据。

总结:

写通道Outbound缓冲区,即遍历刷新链上的写请求,如果写请求消息为字节buf,则调用doWriteBytes完成实际数据发送操作,待子类扩展,如果写请求消息为文件Region,调用doWriteFileRegion完成实际数据发送操作,待子类扩展,数据发送,则更新通道的数据发送进度,并从刷新链上移除写请求;如果所有写请求发送完毕,则重新添加写操作事件到选择key兴趣事件集,否则继续刷新通道Outbound缓冲区中的写请求。

nio字节Unsafe读操作,从通道接收缓冲区读取数据,通知通道处理读取数据,触发Channel管道线的fireChannelRead事件,待数据读取完毕,触发Channel管道线的fireChannelReadComplete事件,如果在读数据的过程中,通道关闭,则触发通道输入关闭事件(fireUserEventTriggered),如果在读数据的过程中,发生异常,则读取缓存区中没有读完的数据,并通道通道处理剩余数据。


附:

//ChannelMetadata
/**
 * Represents the properties of a {@link Channel} implementation.
 */
public final class ChannelMetadata {

    private final boolean hasDisconnect;//是否基于无连接的通信,fasle-》TCP,true-》UDP
    private final int defaultMaxMessagesPerRead;//每次读取,允许读取的最大消息数
    /**
     * Create a new instance
     *
     * @param hasDisconnect     {@code true} if and only if the channel has the {@code disconnect()} operation
     *                          that allows a user to disconnect and then call {@link Channel#connect(SocketAddress)}
     *                          again, such as UDP/IP.
     * @param defaultMaxMessagesPerRead If a {@link MaxMessagesRecvByteBufAllocator} is in use, then this value will be
     * set for {@link MaxMessagesRecvByteBufAllocator#maxMessagesPerRead()}. Must be {@code > 0}.
     */
    public ChannelMetadata(boolean hasDisconnect, int defaultMaxMessagesPerRead) {
        if (defaultMaxMessagesPerRead <= 0) {
            throw new IllegalArgumentException("defaultMaxMessagesPerRead: " + defaultMaxMessagesPerRead +
                                               " (expected > 0)");
        }
        this.hasDisconnect = hasDisconnect;
        this.defaultMaxMessagesPerRead = defaultMaxMessagesPerRead;
    }
    ...
}

//ChannelInputShutdownReadComplete
package io.netty.channel.socket;

/**
 * User event that signifies the channel's input side is shutdown, 
 and we tried to shut it down again. This typically
 * indicates that there is no more data to read.
 */
public final class ChannelInputShutdownReadComplete {
    public static final ChannelInputShutdownReadComplete INSTANCE = new ChannelInputShutdownReadComplete();

    private ChannelInputShutdownReadComplete() {
    }
}


//ChannelInputShutdownEvent
package io.netty.channel.socket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;

/**
 * Special event which will be fired and passed to the
 * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} methods once the input of
 * a {@link SocketChannel} was shutdown and the {@link SocketChannelConfig#isAllowHalfClosure()} method returns
 * {@code true}.
 */
public final class ChannelInputShutdownEvent {

    /**
     * Instance to use
     */
    @SuppressWarnings("InstantiationOfUtilityClass")
    public static final ChannelInputShutdownEvent INSTANCE = new ChannelInputShutdownEvent();

    private ChannelInputShutdownEvent() { }
}

文章评论

亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
每天工作4小时的程序员
每天工作4小时的程序员
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
Google伦敦新总部 犹如星级庄园
Google伦敦新总部 犹如星级庄园
旅行,写作,编程
旅行,写作,编程
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
程序员必看的十大电影
程序员必看的十大电影
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
我是如何打败拖延症的
我是如何打败拖延症的
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
鲜为人知的编程真相
鲜为人知的编程真相
漫画:程序员的工作
漫画:程序员的工作
那些争议最大的编程观点
那些争议最大的编程观点
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
程序员都该阅读的书
程序员都该阅读的书
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
中美印日四国程序员比较
中美印日四国程序员比较
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
60个开发者不容错过的免费资源库
60个开发者不容错过的免费资源库
代码女神横空出世
代码女神横空出世
如何成为一名黑客
如何成为一名黑客
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
 程序员的样子
程序员的样子
程序员和编码员之间的区别
程序员和编码员之间的区别
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
为什么程序员都是夜猫子
为什么程序员都是夜猫子
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
程序员应该关注的一些事儿
程序员应该关注的一些事儿
我的丈夫是个程序员
我的丈夫是个程序员
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
10个调试和排错的小建议
10个调试和排错的小建议
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
一个程序员的时间管理
一个程序员的时间管理
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
总结2014中国互联网十大段子
总结2014中国互联网十大段子
Java程序员必看电影
Java程序员必看电影
程序员的一天:一寸光阴一寸金
程序员的一天:一寸光阴一寸金
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有