在前一篇中Netty4.1 Http开发入门(一)服务端,实现了一个简单的Http Server,这次来实现一个Http客户端。
为了方便进行测试,这次把这个Http客户端整合到SpringBoot里边,调用流程是:PostMan -> SpringBoot Controller -> Http客户端 -> Http Server
简单Http连接
每次请求:客户端创建到服务端的连接,发请求,收相应,然后关闭连接。
笔者这里在SpringBoot中调用客户端使用的是同步模型,但我们知道Netty本身是异步事件驱动模型,所以需要一点线程同步技巧,笔者思路是做一个Response的容器类,即SynResponse,里边用Latch(1)做一个同步锁,Controller里边已同步方式发送请求给服务端之后,通过SynResponse在Latch上await获取响应的Content,等Netty回调Handler时对Latch countDown,这样Controller就会继续执行并把Response Content返回给前端了。代码如下:
SynResponse
public class SynResponse<T> {
private T responseContent;
private CountDownLatch latch = new CountDownLatch(1);
public T getResponseContent() throws InterruptedException {
latch.await();
return responseContent;
}
public void setResponseContent(T responseContent) {
this.responseContent = responseContent;
latch.countDown();
}
}
SimpleNettyHttpClient:
@Slf4j
@Component
public class SimpleNettyHttpClient {
private static final int _1M = 1024 * 1024;
private EventLoopGroup workerGroup;
@PostConstruct
public void init() {
workerGroup = new NioEventLoopGroup();
}
public String sendRequest(String url, String requestBody) throws Exception {
Bootstrap boot = new Bootstrap();
boot.group(workerGroup);
boot.channel(NioSocketChannel.class);
boot.remoteAddress("127.0.0.1", 8080);
SynResponse<String> synResponse = new SynResponse<>();
boot.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(_1M));
ch.pipeline().addLast(new TestHandler(synResponse));
}
});
ChannelFuture f = boot.connect().sync();
log.info("已连接");
URI uri = new URI(url);
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
uri.toASCIIString(), Unpooled.wrappedBuffer(requestBody.getBytes("UTF-8")));
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
log.info("组装Request");
Channel channel = f.channel();
channel.writeAndFlush(request);
log.info("写Request");
channel.closeFuture().sync();
log.info("关闭Channel");
//workerGroup.shutdownGracefully();
log.info("同步阻塞获取Response Content");
return synResponse.getResponseContent();
}
public static class TestHandler extends ChannelInboundHandlerAdapter {
private SynResponse<String> synResponse;
public TestHandler(SynResponse<String> synResponse) {
this.synResponse = synResponse;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
FullHttpResponse response = (FullHttpResponse) msg;
String content = response.content().toString(CharsetUtil.UTF_8);
log.info("收到服务端response:{}", content);
synResponse.setResponseContent(content);
log.info("Response content放入result");
}
}
}
Controller:
@Autowired
private SimpleNettyHttpClient simpleNettyClient;
@RequestMapping(value = "/postStringSimple", method = RequestMethod.POST)
public String postStringSimple(@RequestBody String data) throws Exception {
String res = simpleNettyClient.sendRequest("http://127.0.0.1:8080/test", data);
logger.info("Controller返回:{}", res);
return res;
}
Http连接池
上面的简单Http连接是调用线程每次创建一个连接,在连接的生命周期内,都是为这一个调用线程所用。而如果要使用连接池模式,那么连接在生命周期内是可能被多个线程重复使用的,多个线程通过1个连接发出去的请求,在收到服务端响应、Netty回调Handler时必须知道这个Response对应的是哪个Request,笔者这里采用的是对每个连接Channel在池里创建的时候绑定一个Map,用来存放<RequestID, SynResponse>这样的一个KV对,即发送请求的时候生成唯一的Request存入Map,Handler收到Response时把响应内容封装为SynContent并与RequestID建立关联放入Map,为了防止内存溢出,需要在本次请求使用完之后remove掉对应的key值。连接池的实现笔者使用的是Netty提供的实现,即ChannelPoolMap
和ChannelPoolHandler
两个接口。代码如下:
PoolingNettyClient 使用ChannelPoolMap接口来做连接池的初始化。对外提供sendRequest方法:
@Slf4j
@Component
public class PoolingNettyClient {
//key是remote地址,每个remote地址对应一个FixedChannelPool
private ChannelPoolMap<String, FixedChannelPool> poolMap;
@PostConstruct
public void init() {
log.info("pooling netty client 初始化...");
Bootstrap boot = new Bootstrap();
boot.channel(NioSocketChannel.class);
boot.group(new NioEventLoopGroup());
boot.remoteAddress("127.0.0.1", 8080);
poolMap = new AbstractChannelPoolMap<String, FixedChannelPool>() {
ChannelPoolHandler handler = new PoolingChannelHandler();
@Override
protected FixedChannelPool newPool(String key) {
return new FixedChannelPool(boot, handler, 50);
}};
}
//发送请求
public String sendRequest(String url, String requestBody) throws InterruptedException, ExecutionException {
log.info("发送请求,url:{},requestbody:{}", url, requestBody);
RequestWraper requestWraper = new RequestWraper(url, requestBody);
String requestId = requestWraper.getRequest().headers().get("RequestID");
//AttributeKey<Map> resMapKey = AttributeKey.valueOf("ResponseMap");
FixedChannelPool pool= poolMap.get("127.0.0.1");
Future<Channel> channelAcquireDone = pool.acquire().sync(); //从池中获取连接
Channel channel = channelAcquireDone.get();
log.info("已从连接池获取连接...");
//准备response容器,并放入ResponseMap
Map<String, SynResponse<String>> responseMap = getResponseMap(channel);
log.info(JSON.toJSONString(responseMap));
SynResponse<String> synResponse = new SynResponse<>();
if(responseMap != null)
responseMap.put(requestId, synResponse);
//log.info(JSON.toJSONString(getResponseMap(channel)));
//写请求
log.info("写请求...");
ChannelFuture writeRequestDone = channel.writeAndFlush(requestWraper.getRequest()).sync();
if(writeRequestDone.isSuccess()) {
log.info("requestData已发送,ChannelID:{}, requestId:{}", channel.id(), requestId);
}else {
log.error("requestData发送失败,ChannelID:{}, requestId:{}", channel.id(), requestId);
}
pool.release(channel); //把连接还回池
log.info("把连接还回池...");
//从Response容器阻塞获取content,返回前端
return synResponse.getResponseContent();
}
private Map<String, SynResponse<String>> getResponseMap(Channel ch) {
AttributeKey<Map<String, SynResponse<String>>> resMapKey = AttributeKey.valueOf("ResponseMap");
return ch.attr(resMapKey).get();
}
}
PoolingChannelHandler 连接池里每个channel的handler,处理channel相应的各个事件create、acquire、release等:
@Slf4j
public class PoolingChannelHandler implements ChannelPoolHandler{
private static final int _1M = 1024 * 1024;
@Override
public void channelReleased(Channel ch) throws Exception {
log.info("释放连接...");
}
@Override
public void channelAcquired(Channel ch) throws Exception {
log.info("获取到连接...");
}
@Override
public void channelCreated(Channel ch) throws Exception {
log.info("已创建连接...");
//channel上绑定个Map,用来放响应result
AttributeKey<Map> key = AttributeKey.valueOf("ResponseMap");
ch.attr(key).set(new ConcurrentHashMap<String, SynResponse<String>>());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder())
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(_1M))
.addLast(new ResponseHandler());
}
}
RequestWraper 请求包装类,包装了一个DefaultFullHttpRequest
,设置了RequestID Header ,并且设置Content-Length Header,TCP层面属于指定长度的传输:
public class RequestWraper {
private DefaultFullHttpRequest request;
public RequestWraper(String url, String requestBody) {
String requestId = genRequestId();
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url,
Unpooled.wrappedBuffer(requestBody.getBytes(CharsetUtil.UTF_8)));
request.headers().add("RequestID", requestId);
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
}
private String genRequestId() {
return UUID.randomUUID().toString().replace("-", "");
}
public DefaultFullHttpRequest getRequest() {
return request;
}
}
ResponseHandler 业务逻辑Handler,这里用来处理服务端返回的Response:
@Slf4j
public class ResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("读Response");
FullHttpResponse response = (FullHttpResponse) msg;
String requestId = response.headers().get("RequestID");
ByteBuf content = response.content();
String responseContent = content.toString(CharsetUtil.UTF_8);
log.info("收到服务端response:{}", content.toString(CharsetUtil.UTF_8));
//将Response content放入到ReponseMap里的SynResponse容器中
Channel ch = ctx.channel();
AttributeKey<Map> key = AttributeKey.valueOf("ResponseMap");
Map responseMap = ch.attr(key).get();
SynResponse<String> synResponse = (SynResponse) responseMap.remove(requestId);
log.info("从ResponseMap获得SynResponse, 准备写Content");
synResponse.setResponseContent(responseContent);
log.info("已向SynResponse写入content");
}
}
这里提一句,上面的Netty连接池属于“连接复用”且是“独享模式”,连接取用之后放回池里给其他线程使用,这是复用,同一个时刻一个连接只给一个线程所独占使用,这是独享,JDBC,Apache HttpClient所用的连接池都是这种模式。
连接“共享模式”的例子就是Redis的Lettuce连接池,同一个时刻一个连接是被多个线程一起使用的。Redis Server处理命令用的是一个单线程,而TCP协议本身保证顺序性,这样一来每个连接上的发出的请求的顺序一定跟返回的响应的顺序就一致了,所以这种情况下多个线程共用一个连接是线程安全的。
我们上面的连接池如果要改成这种模式的实现,也是需要RequestId来保证响应与请求的对应关系,因为服务端是多线程乱序处理的,无法保证同一个连接上响应返回的顺序与请求顺序一致。
btw,我们上面是为了转同步而不得不用的RequestId,本来独享模式是不需要的,注意体会一下。
总结
-
复用EventLoopGroup资源
EventLoopGroup
属于全局资源,不要每个连接请求都去创建。 -
如果用同步的方式使用Netty
Netty入站事件用的是各个Handler的回调,出站事件反应到代码上是Future类型。Netty的这种事件驱动异步编程模型如果要用在基于Servlet模型的Spring MVC开发里边,需要做一些同步技巧。
-
Transfer-Encoding: chunked和Content-Length
Http底层基于TCP,数据传输使用的是"长度声明+变长包"传输,有3种情况:
- 短链接,使用连接关闭的动作告知对端本次传输结束。
- 长连接,提前计算好本次传输数据长度,在Header里边添加
Content-Length:length
包头来告知对端本次传输的数据大小。 - 长连接,没法提前计算本次数据长度或基于性能考虑不想这么做,比如gzip压缩的场景,不等到全部压缩结束去计算压缩后的大小然后添加到头里,而是采用所谓chunked方式,Http Header添加
Transfer-Encoding: chunked
,告知对端本次将采用分块传输的方式,然后将数据分成若干个chunk传输,每个chunk的第1行用16进制声明本chunk的字节长度。
-
HttpObjectAggregator聚合
在Netty里可以使用
HttpObjectAggregator
来聚合上面这种chunked传输的情况,它会自动聚合成一个完整的FullHttpRequest
或FullHttpResponse
提供给Pipeline上的下一个Handler,开发人员不必在自己的Handler里手工去聚合HttpObject
了。 -
Netty4自带的连接池
Netty 4提供了自带的连接池实现,主要是使用
ChannelPool
、ChannelPoolHandler
、ChannelPoolMap
三个接口,ChannelPoolMap用来按照比如对端地址等作为key、ChannelPool实现作为value的映射关系,因为客户端可能会要面对多个对端建立多个连接池。ChannelPoolHandler是绑定到ChannelPool上的,负责池中每个连接的事件的处理、比如create、accquire、release等。ChannelPool就是具体的一个连接池了,Netty也提供了其一个实现FixedChannelPool
。Netty 4的自带的连接池实现,本质是连接独占模式,1个连接同一时刻只被1个线程使用,而不是连接共享(参考Lettuce连接池)
参考:
基于 Netty 如何实现高性能的 HTTP Client 的连接池 - 云+社区 - 腾讯云 (tencent.com)
Netty客户端连接池ChannelPool应用 【支持https请求】 - harara-小念 - 博客园 (cnblogs.com)
Netty Client实战——高并发连接池方案 | EGNOD'S BLOG (itboyer.github.io)
(5条消息) netty实战-netty client连接池设计Sam_Deep_Thinking-CSDN博客netty客户端连接池
Netty自带连接池的使用 - Ruthless - 博客园 (cnblogs.com)