常用的Handler-Netty心跳检测

发表时间:2017-09-20 14:58:02 浏览量( 29 ) 留言数( 0 )

学习目标:

1、了解LengthFieldBasedFrameDecoder

2、了解Netty的编解码


学习过程:

一、示例 IdleStateHandler

    我们可以通过这个IdleStateHandler实现一下的功能,如果客户端超过了指定的时间都没有往服务端发送任何读取操作或者服务端没有向客户端发送写信息时,那么我们可以尝试给客户端发送一个信息,监测链接是否还有效,如果没有效了,就可以关闭链接了,当然你也可以直接把链接关闭了。

    在实际环境中,检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务,Netty 特地为它提供了几个ChannelHandler 实现。

  • IdleStateHandler 当连接空闲时间太长时,将会触发一个IdleStateEvent 事件。然后,你可以通过在你的ChannelInboundHandler 中重写userEvent-Triggered()方法来处理该IdleStateEvent 事件。

  • ReadTimeoutHandler 如果在指定的时间间隔内没有收到任何的入站数据,则抛出一个Read-TimeoutException 并关闭对应的Channel。可以通过重写你的ChannelHandler 中的exceptionCaught()方法来检测该Read-TimeoutException。

  • WriteTimeoutHandler 如果在指定的时间间隔内没有任何出站数据写入,则抛出一个Write-TimeoutException 并关闭对应的Channel 。可以通过重写你的ChannelHandler 的exceptionCaught()方法检测该WriteTimeout-Exception。

    让我们看看在实践中使用得最多的IdleStateHandler 吧。我们实现一下的功能当使用通常的发送心跳消息到远程节点的方法时,如果在60 秒之内没有接收或者发送任何的数据,我们将如何得到通知;如果没有响应,则连接会被关闭。

   定义一个ChannelInitializer,以前都是使用匿名的内部类的,今天我们独立成为一个类,代码如下:

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		//IdleStateHandler 将在被触发时发送一个IdleStateEvent 事件
		pipeline.addLast(new IdleStateHandler(0, 0, 10, TimeUnit.SECONDS));
		//将一个HeartbeatHandler添加到ChannelPipeline中
		pipeline.addLast(new HeartbeatHandler());

	}

	public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
		
		//发送到远程节点的心跳消息
		private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
				.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));

		//实现userEven t-Triggered()方法以发送心跳消息
		@Override
		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
			System.out.println("===========userEventTriggered===============");
			if (evt instanceof IdleStateEvent) {
				//发送心跳消息,并在发送失败时关闭该连接,以保持链接信息可用。
				ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
				
				//直接关闭
				//ctx.close();
			} else {
				//不是IdleStateEvent事件,所以将它传递给下一个Channel-InboundHandler
				super.userEventTriggered(ctx, evt);
			}
		}

	}

}

服务端启动代码:

public class Server {
	private int port;

	public Server(int port) {
		this.port = port;
	}

	public void run() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup(); 
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap(); 
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
					.childHandler(new IdleStateHandlerInitializer()).option(ChannelOption.SO_BACKLOG, 128) 
					.childOption(ChannelOption.SO_KEEPALIVE, true); 
			// 绑定端口,开始接收进来的连接
			ChannelFuture f = b.bind(port).sync(); 
			// 等待服务器 socket 关闭 。
			// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}

	public static void main(String[] args) throws Exception {
		int port;
		if (args.length > 0) {
			port = Integer.parseInt(args[0]);
		} else {
			port = 8084;
		}
		new Server(port).run();
	}

}

客户端可以使用之前的任何一个客户端的代码即可。


二、原理:

    IdleStateHandler这个类会根据你设置的超时参数的类型和值,循环去检测channelRead和write方法多久没有被调用了,如果这个时间超过了你设置的值,那么就会触发对应的事件,read触发read,write触发write,all触发all.

   初步地看下IdleStateHandler源码,先看下IdleStateHandler中的channelRead方法,这个方法会记录最近一次的访问时间,后面需要这个变量做判断的。

attcontent/6de43700-c8bc-4387-ab54-99a6b417f75a.png

   在看看initialize方法,这边会触发一个Task,不同的方式触发不同的Task,Schedule就是使用我们前面学过的线程池实现的: ctx.executor().schedule(task, delay, unit);

attcontent/256f455f-b1fb-4e6c-b877-0f904c1c7730.png

我们可以看一下ReaderIdleTimeoutTask,这个task是部分源码是这样的:

attcontent/b812d236-dff1-4b51-a162-24e28ee78153.png


  逻辑也非常简单。用当前时间减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么channelIdle(ctx,event)则会触发userEventTriggered方法。所以这个逻辑就时有一个定时器定时的检测而已。