Netty实现Http协议

发表时间:2017-09-13 14:59:01 浏览量( 41 ) 留言数( 0 )

学习目标:

1、了解LengthFieldBasedFrameDecoder

2、了解Netty的编解码


学习过程:

    HTTP 是基于请求/响应模式的:客户端向服务器发送一个HTTP 请求,然后服务器将会返回一个HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。

attcontent/eb388571-bee9-4c2a-81f9-1e005bae8751.png

attcontent/9ae31c46-84f7-4e22-bda4-2562d3347154.png

    一个HTTP 请求/响应可能由多个数据部分组成,并且它总是以一个LastHttpContent 部分作为结束。FullHttpRequest 和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。

  • HttpRequestEncoder 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节

  • HttpResponseEncoder 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节

  • HttpRequestDecoder 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息

  • HttpResponseDecoder 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息

我们可以直接使用HttpClientCodec和HttpServerCodec,你可以看一下下面的代码定义:

HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>

HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>



2、聚合HTTP 消息

    在ChannelInitializer 将ChannelHandler 安装到ChannelPipeline 中之后,你便可以处理不同类型的HttpObject 消息了。但是由于HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器,它可以将多个消息部分合并为FullHttpRequest 或者FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容。

    由于消息分段需要被缓冲,直到可以转发一个完整的消息给下一个ChannelInbound-Handler,所以这个操作有轻微的开销。其所带来的好处便是你不必关心消息碎片了。引入这种自动聚合机制只不过是向ChannelPipeline 中添加另外一个ChannelHandler罢了。

    即通过它可以把 HttpMessage 和 HttpContent 聚合成一个 FullHttpRequest 或者 FullHttpResponse (取决于是处理请求还是响应),而且它还可以帮助你在解码时忽略是否为“块”传输方式。

    因此,在解析 HTTP POST 请求时,请务必在 ChannelPipeline 中加上 HttpObjectAggregator。

pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));


3、HTTP 压缩

    当使用HTTP 时,建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带来一些CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说。Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。

HTTP 请求的头部信息

客户端可以通过提供以下头部信息来指示服务器它所支持的压缩格式:

GET /encrypted-area HTTP/1.1

Host: www.example.com

Accept -Encoding: gzip, deflate

然而,需要注意的是,服务器没有义务压缩它所发送的数据。


完整的代码如下:

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
	
	private final boolean client;

	public HttpPipelineInitializer(boolean client) {
		this.client = client;
	}

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		if (client) {
			//pipeline.addLast("decoder", new HttpResponseDecoder());
			//pipeline.addLast("encoder", new HttpRequestEncoder());
			pipeline.addLast("codec", new HttpClientCodec());
			pipeline.addLast("decompressor", new HttpContentDecompressor());
			pipeline.addLast(new HttpClientHandler());//业务处理
		} else {
			//pipeline.addLast("decoder", new HttpRequestDecoder());
			//pipeline.addLast("encoder", new HttpResponseEncoder());
			pipeline.addLast("codec", new HttpServerCodec());
			pipeline.addLast("compressor", new HttpContentCompressor());
			pipeline.addLast(new HttpServerHandler());//业务处理
		}
		
		//将一个HttpMessage 和跟随它的多个HttpContent 聚合为单个FullHttpRequest 或者FullHttpResponse(取决于它是被用来处理请求还是响应)。
        //安装了这个之后,ChannelPipeline 中的下一个ChannelHandler 将只会收到完整的HTTP 请求或响应
		pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));  //聚合HTTP 消息
		
	}
}


服务端的业务处理类:

/**
 * 服务端的业务处理
 * 
 * @author liubao
 *
 */
public class HttpServerHandler extends ChannelInboundHandlerAdapter {
	private HttpPostRequestDecoder decoder;

	private HttpRequest request;
	private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MAXSIZE);
	private HttpHeaders headers;
	private FullHttpRequest fullRequest;

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {

		System.out.println("channelRead");

		if (msg instanceof HttpRequest) {

			request = (HttpRequest) msg;
			headers = request.headers();
			String uri = request.uri();
			System.out.println("Uri:" + uri);

			HttpMethod method = request.method();
			if (method.equals(HttpMethod.GET)) {
				QueryStringDecoder queryDecoder = new QueryStringDecoder(uri, Charsets.UTF_8);
				Map<String, List<String>> uriAttributes = queryDecoder.parameters();
				// 获得所有的get方法里面的uri后面的参数 ?key=value
				for (Map.Entry<String, List<String>> attr : uriAttributes.entrySet()) {
					for (String attrVal : attr.getValue()) {
						System.out.println(attr.getKey() + "=" + attrVal);
					}
				}
			} else if (method.equals(HttpMethod.POST)) {

				// POST请求,由于你需要从消息体中获取数据,因此有必要把msg转换成FullHttpRequest
				fullRequest = (FullHttpRequest) msg;

				try {
					// 根据不同的 Content_Type 处理 body 数据
					dealWithContentType();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			} else {
				// HttpRestful的其他方法以后可以再扩展
			}

		}

		if (msg instanceof HttpContent) {
			HttpContent content = (HttpContent) msg;
			ByteBuf buf = content.content();
			System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
			buf.release();

		}

		// 返回给客户端端
		String res = "你好啊。";
		byte [] resByte=res.getBytes("UTF-8");
		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK,
				Unpooled.wrappedBuffer(resByte));
		response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
		response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());

		boolean keepAlive = HttpUtil.isKeepAlive(request);

		if (keepAlive) {
			response.headers().set(HttpHeaderNames.CONTENT_LENGTH, resByte.length);
			response.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
		}
		ChannelFuture future = ctx.writeAndFlush(response);
		ctx.flush();

		if (!keepAlive) {
			future.addListener(ChannelFutureListener.CLOSE);
		}

	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}

	private void dealWithContentType() throws Exception {
		// 获得Content-Type
		String typeStr = headers.get(HttpHeaderNames.CONTENT_TYPE).toString();
		String[] list = typeStr.split(";");
		String contentType = list[0];

		if (contentType.equals("application/json")) { // 可以使用HttpJsonDecoder
			String jsonStr = fullRequest.content().toString(Charsets.UTF_8);

			System.out.println("jsonStr=" + jsonStr);

		} else if (contentType.equals("application/x-www-form-urlencoded")) {
			// 方式一:使用 QueryStringDecoder
			String jsonStr = fullRequest.content().toString(Charsets.UTF_8);
			QueryStringDecoder queryDecoder = new QueryStringDecoder(jsonStr, false);
			Map<String, List<String>> uriAttributes = queryDecoder.parameters();
			for (Map.Entry<String, List<String>> attr : uriAttributes.entrySet()) {
				for (String attrVal : attr.getValue()) {
					System.out.println(attr.getKey() + "=" + attrVal);
				}
			}

		} else if (contentType.equals("multipart/form-data")) {
			// 用于文件上传

		} else {
			// 其他
		}
	}


}


客户端其实使用浏览器也可以的,也可以自己 定义一个客户端的处理类:

public class HttpClientHandler extends ChannelInboundHandlerAdapter {
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) {
		
		System.out.println("ClientHandler");
		
		
		try {
			URI uri = new URI("http://127.0.0.1:80");
			String msg = "Are you ok?";
			DefaultFullHttpRequest request = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET,
			        uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));

			// 构建http请求
			request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
			//request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE); //保持长连接
			request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.CLOSE);//不保持长连接
			request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
			// 发送http请求
			ctx.channel().write(request);
			ctx.channel().flush();
			//ctx.channel().closeFuture().sync();
		} catch (UnsupportedEncodingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
		
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		if (msg instanceof HttpResponse) {
			
		}
		if (msg instanceof HttpContent) {
			HttpContent content = (HttpContent) msg;
			ByteBuf buf = content.content();
			System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
			buf.release();
			
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}