zoukankan      html  css  js  c++  java
  • Java io.netty.util.ReferenceCountUtil 代码实例

    原文:https://www.helplib.com/Java_API_Classes/article_64580

    以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

    实例 1


    private static void testPerformOpeningHandshake0(boolean subProtocol) {
        EmbeddedChannel ch = new EmbeddedChannel(
                new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
        FullHttpRequest req = ReferenceCountUtil.releaseLater(
                new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
        req.headers().set(Names.HOST, "server.example.com");
        req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
        req.headers().set(Names.CONNECTION, "Upgrade");
        req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
        req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
        req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
        req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13");
        if (subProtocol) {
            new WebSocketServerHandshaker13(
                    "ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
        } else {
            new WebSocketServerHandshaker13(
                    "ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
        }
        ByteBuf resBuf = (ByteBuf) ch.readOutbound();
        EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
        ch2.writeInbound(resBuf);
        HttpResponse res = (HttpResponse) ch2.readInbound();
        Assert.assertEquals(
                "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
        if (subProtocol) {
            Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
        } else {
            Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
        }
        ReferenceCountUtil.release(res);
    }

    实例 2


    @Test
    public void testHttpUpgradeRequest() throws Exception {
        EmbeddedChannel ch = createChannel(new MockOutboundHandler());
        ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerProtocolHandshakeHandler.class);
        writeUpgradeRequest(ch);
        assertEquals(SWITCHING_PROTOCOLS, ReferenceCountUtil.releaseLater(responses.remove()).getStatus());
        assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx));
    }

    实例 3


    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	Class<?> messageClass = msg.getClass();
    	if (!handshaker.isHandshakeComplete()) {
    		ctx.pipeline().remove(HttpObjectAggregator.class);
    		handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
    		httpChannel = new NettyHttpChannel(tcpStream, new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) {
    			@Override
    			protected void doSubscribeHeaders(Subscriber<? super Void> s) {
    				Publishers.<Void>empty().subscribe(s);
    			}
    		};
    		NettyHttpWSClientHandler.super.channelActive(ctx);
    		super.channelRead(ctx, msg);
    		return;
    	}
    	if (TextWebSocketFrame.class.isAssignableFrom(messageClass)) {
    		try {
    			//don't inflate the String bytes now
    			channelSubscriber.onNext(new StringBuffer(((TextWebSocketFrame) msg).content().nioBuffer()));
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
    	} else if (CloseWebSocketFrame.class.isAssignableFrom(messageClass)) {
    		ctx.close();
    	} else {
    		doRead(ctx, ((WebSocketFrame)msg).content());
    	}
    }

    实例 4


    @SuppressWarnings("unchecked")
    protected final void doRead(ChannelHandlerContext ctx, Object msg) {
    	try {
    		if (null == channelSubscriber || msg == Unpooled.EMPTY_BUFFER) {
    			ReferenceCountUtil.release(msg);
    			return;
    		}
    		NettyBuffer buffer = NettyBuffer.create(msg);
    		try {
    			channelSubscriber.onNext(buffer);
    		}
    		finally {
    			if (buffer.getByteBuf() != null) {
    				if (buffer.getByteBuf()
    				          .refCnt() != 0) {
    					ReferenceCountUtil.release(buffer.getByteBuf());
    				}
    			}
    		}
    	}
    	catch (Throwable err) {
    		Exceptions.throwIfFatal(err);
    		if (channelSubscriber != null) {
    			channelSubscriber.onError(err);
    		}
    		else {
    			throw err;
    		}
    	}
    }

    实例 5


    /**
     * Test try to reproduce issue #1335
     */
    @Test
    public void testBindMultiple() throws Exception {
        DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            for (int i = 0; i < 100; i++) {
                Bootstrap udpBootstrap = new Bootstrap();
                udpBootstrap.group(group).channel(NioDatagramChannel.class)
                        .option(ChannelOption.SO_BROADCAST, true)
                        .handler(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                // Discard
                                ReferenceCountUtil.release(msg);
                            }
                        });
                DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
                        .bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
                channelGroup.add(datagramChannel);
            }
            Assert.assertEquals(100, channelGroup.size());
        } finally {
            channelGroup.close().sync();
            group.shutdownGracefully().sync();
        }
    }

    实例 6


    @BeforeClass
    public static void init() {
        // Configure a test server
        group = new LocalEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(group)
          .channel(LocalServerChannel.class)
          .childHandler(new ChannelInitializer<LocalChannel>() {
              @Override
              public void initChannel(LocalChannel ch) throws Exception {
                  ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                      @Override
                      public void channelRead(ChannelHandlerContext ctx, Object msg) {
                          // Discard
                          ReferenceCountUtil.release(msg);
                      }
                  });
              }
          });
        localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
    }

    实例 7


    @BeforeClass
    public static void init() {
        // Configure a test server
        group = new LocalEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(group)
                .channel(LocalServerChannel.class)
                .childHandler(new ChannelInitializer<LocalChannel>() {
                    @Override
                    public void initChannel(LocalChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                // Discard
                                ReferenceCountUtil.release(msg);
                            }
                        });
                    }
                });
        localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
    }

    实例 8


    @Override
    public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
        if (message == null) {
            throw new NullPointerException("message");
        }
        if (matcher == null) {
            throw new NullPointerException("matcher");
        }
        Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
        for (Channel c: nonServerChannels) {
            if (matcher.matches(c)) {
                futures.put(c, c.write(safeDuplicate(message)));
            }
        }
        ReferenceCountUtil.release(message);
        return new DefaultChannelGroupFuture(this, futures, executor);
    }

    实例 9


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (isRemote(ctx)) {
            ByteBuf payload = (ByteBuf) msg;
            byte[] data = getPayloadFromByteBuf(payload);
            writeBuffer(data);
            return;
        }
        ReferenceCountUtil.retain(msg);
        // propagate the data to rest of handlers in pipeline
        ctx.fireChannelRead(msg);
    }

    实例 10


    @Override
    public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        // The first message must be authentication response
        if (this.authenticationUrl != null && (this.cookies == null || this.cookies.isEmpty())) {
            HttpResponse response = (HttpResponse) msg;
            CharSequence cookieData = response.headers().get(new AsciiString("set-cookie"));
            if (cookieData != null) {
                this.cookies = ServerCookieDecoder.decode(cookieData.toString());
                if (this.cookies == null || this.cookies.isEmpty()) {
                    throw new WebSocketAuthenticationFailureException("Could not authenticate");
                }
                if (log.isDebugEnabled()) {
                    for (Cookie cookie : this.cookies) {
                        log.debug("Server says must set cookie with name {} and value {}", cookie.name(), cookie.value());
                    }
                }
            } else {
                throw new ITException("Could not authenticate");
            }
            if (log.isDebugEnabled()) {
                log.debug("Authentication succeeded for user {}", this.user);
            }
            handShaker.handshake(ctx.channel());
            return;
        }
        // The second one must be the response for web socket handshake
        if (!handShaker.isHandshakeComplete()) {
            handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
            if (log.isDebugEnabled()) {
                log.debug("Web socket client connected for user {}", this.user);
            }
            handshakeFuture.setSuccess();
            return;
        }
        // Take the byte buff and send it up to Stomp decoder
        if (msg instanceof WebSocketFrame) {
            if (log.isDebugEnabled()) {
                if (msg instanceof TextWebSocketFrame) {
                    log.debug("Received text frame {}", ((TextWebSocketFrame) msg).text());
                }
            }
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(((WebSocketFrame) msg).content());
        }
    }

    实例 11


    @Override
    protected void encode(ChannelHandlerContext ctx, DefaultHttpMessage defaultHttpMessage, List out) throws Exception {
        if (defaultHttpMessage.headers().contains(HttpHeaders.CONTENT_LENGTH, "", true)) {
            defaultHttpMessage.headers().remove(HttpHeaders.CONTENT_LENGTH);
        }
        ReferenceCountUtil.retain(defaultHttpMessage);
        out.add(defaultHttpMessage);
    }

    实例 12


    private static Object safeDuplicate(Object message) {
        if (message instanceof ByteBuf) {
            return ((ByteBuf) message).duplicate().retain();
        } else if (message instanceof ByteBufHolder) {
            return ((ByteBufHolder) message).duplicate().retain();
        } else {
            return ReferenceCountUtil.retain(message);
        }
    }

    实例 13


    @Override
    public void onNext(T t) {
        // Retain so that post-buffer, the ByteBuf does not get released.
        // Release will be done after reading from the subject.
        ReferenceCountUtil.retain(t);
        state.bufferedObserver.onNext(t);
        // Schedule timeout once and when not subscribed yet.
        if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
            timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
                @Override
                public void call(Long aLong) {
                    disposeIfNotSubscribed();
                }
            });
        }
    }

    实例 14


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        Channel channel = ctx.channel();
        if (msg instanceof HttpRequest) {
            HttpRequest request = (HttpRequest) msg;
            if (handleRequest(request, channel, ctx)) {
                if (httpMethodInfoBuilder.getHttpResourceModel()
                        .isStreamingReqSupported() &&
                        channel.pipeline().get("aggregator") != null) {
                    channel.pipeline().remove("aggregator");
                } else if (!httpMethodInfoBuilder.getHttpResourceModel()
                        .isStreamingReqSupported() &&
                        channel.pipeline().get("aggregator") == null) {
                    channel.pipeline().addAfter("router", "aggregator",
                            new HttpObjectAggregator(Integer.MAX_VALUE));
                }
            }
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(msg);
        } else if (msg instanceof HttpContent) {
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(msg);
        }
    }

    实例 15


    @Override
    public void onData(final ByteBuf input) {
        // We need to retain until the serializer gets around to processing it.
        ReferenceCountUtil.retain(input);
        serializer.execute(new Runnable() {
            @Override
            public void run() {
                if (isTraceBytes()) {
                    TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
                }
                ByteBuffer source = input.nioBuffer();
                do {
                    ByteBuffer buffer = protonTransport.getInputBuffer();
                    int limit = Math.min(buffer.remaining(), source.remaining());
                    ByteBuffer duplicate = source.duplicate();
                    duplicate.limit(source.position() + limit);
                    buffer.put(duplicate);
                    protonTransport.processInput();
                    source.position(source.position() + limit);
                } while (source.hasRemaining());
                ReferenceCountUtil.release(input);
                // Process the state changes from the latest data and then answer back
                // any pending updates to the Broker.
                processUpdates();
                pumpToProtonTransport();
            }
        });
    }

     

  • 相关阅读:
    [极客大挑战 2019]BuyFlag
    [极客大挑战 2019]BabySQL
    [网鼎杯 2018]Fakebook
    C语言学习笔记_内存数据和字符串
    剑指OFFER_数据流中的中位数
    剑指OFFER_滑动窗口的最大值
    剑指OFFER_矩阵中的路径
    C语言学习笔记_指针相关知识
    剑指OFFER_机器人的运动范围
    剑指OFFER_剪绳子
  • 原文地址:https://www.cnblogs.com/shihaiming/p/9561838.html
Copyright © 2011-2022 走看看