处理基于流的传输

发表时间:2017-09-13 14:55:17 浏览量( 27 ) 留言数( 0 )

学习目标:

1、了解基于流的传输的示例

2、会写入一个简单Netty的示例


学习过程:

    这是今天最重要的一节课了。由于TCP粘包和拆包的问题,所以在通讯的时候会有很多问题,下面我们简单说一下

    基于流的传输比如 TCP/IP, 接收到数据是存在 socket 接收的 buffer 中。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。意味着,即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的 TCP/TP 协议栈已经接收了3个数据包:    

attcontent/add30ac7-c187-43e8-9ffe-42198b00c3e9.png

    由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段

attcontent/492a29d3-9f93-4f40-b40f-938acaa06fa2.png

    因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:

attcontent/18782870-ef92-4a03-9c7d-4249517cebe4.png


一、解决办法一

    回到 TIME 客户端例子。同样也有类似的问题。一个32位整型是非常小的数据,他并不见得会被经常拆分到到不同的数据段内。然而,问题是他确实可能会被拆分到不同的数据段内,并且拆分的可能性会随着通信量的增加而增加。

    最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了 TimeClientHandler 的实现类修复了这个问题

public class TimeClientHandlerNew extends ChannelInboundHandlerAdapter {
	private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
    	System.out.println("handlerAdded");
        buf = ctx.alloc().buffer(4); // (1)
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
    	System.out.println("handlerRemoved");
        buf.release(); // (1)
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();

        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

    1.ChannelHandler 有2个生命周期的监听方法:handlerAdded()和 handlerRemoved()。你可以完成任意初始化任务只要他不会被阻塞很长的时间。

    2.首先,所有接收的数据都应该被累积在 buf 变量里。

    3.然后,处理器必须检查 buf 变量是否有足够的数据,在这个例子中是4个字节,然后处理实际的业务逻辑。否则,Netty 会重复调用channelRead() 当有更多数据到达直到4个字节的数据被积累。


二、解决方法二

    尽管第一个解决方案已经解决了 TIME 客户端的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的 ChannelInboundHandler 的实现将很快地变得难以维护。

    正如你所知的,你可以增加多个 ChannelHandler 到ChannelPipeline ,因此你可以把一整个ChannelHandler 拆分成多个模块以减少应用的复杂程度,比如你可以把TimeClientHandler 拆分成2个处理器:

    TimeDecoder 处理数据拆分的问题

    TimeClientHandler 原始版本的实现

幸运地是,Netty 提供了一个可扩展的类,帮你完成 TimeDecoder 的开发。

public class TimeDecoder extends ByteToMessageDecoder {

	@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }

        out.add(in.readBytes(4)); // (4)
    }
}

    1.ByteToMessageDecoder 是 ChannelInboundHandler 的一个实现类,他可以在处理数据拆分的问题上变得很简单。

    2.每当有新数据接收的时候,ByteToMessageDecoder 都会调用 decode() 方法来处理内部的那个累积缓冲。

    3.Decode() 方法可以决定当累积缓冲里没有足够数据时可以往 out 对象里放任意数据。当有更多的数据被接收了 ByteToMessageDecoder 会再一次调用 decode() 方法。

    4.如果在 decode() 方法里增加了一个对象到 out 对象里,这意味着解码器解码消息成功。ByteToMessageDecoder 将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用 decode(),ByteToMessageDecoder 会持续调用 decode() 直到不放任何数据到 out 里。

    现在我们有另外一个处理器插入到 ChannelPipeline 里,TimeClientHandler还是用回上一节课的,我们应该在 TimeClient 里修改 ChannelInitializer 的实现:

b.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					 //ch.pipeline().addLast( new TimeClientHandlerNew());
					ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
				}
			});


Decoder还有另外一种实现方法

public class TimeDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

    ReplayingDecoder是byte-to-message解码的一种特殊的抽象基类,读取缓冲区的数据之前须要检查缓冲区是否有足够的字节。使用ReplayingDecoder就无需自己检查;若ByteBuf中有足够的字节,则会正常读取。若没有足够的字节则会停止解码。

    也正由于这种包装使得ReplayingDecoder带有一定的局限性。

    不是全部的操作都被ByteBuf支持,假设调用一个不支持的操作会抛出DecoderException。

    ByteBuf.readableBytes()大部分时间不会返回期望值

    假设你能忍受上面列出的限制。相比ByteToMessageDecoder,你可能更喜欢ReplayingDecoder。

    在满足需求的情况下推荐使用ByteToMessageDecoder。由于它的处理比較简单,没有ReplayingDecoder实现的那么复杂。ReplayingDecoder继承与ByteToMessageDecoder,所以他们提供的接口是同样的