zoukankan      html  css  js  c++  java
  • Netty实现的一个异步Socket代码

     本人写的一个使用Netty实现的一个异步Socket代码

    package test.core.nio;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import java.net.InetSocketAddress;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.math.NumberUtils;
    import org.jboss.netty.bootstrap.ClientBootstrap;
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.buffer.ChannelBuffers;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
    
    /**
     * @author xfyou
     * @date 2019/3/21
     */
    @Slf4j
    public class AsyncSocket {
    
      private final ClientBootstrap clientBootstrap;
    
      private final InetSocketAddress address;
    
      private int timeout;
    
      private static final byte CONNECTORS_POOL_SIZE = 1;
    
      private static final byte WORKERS_POOL_SIZE = 30;
    
      public AsyncSocket(String hostIp, String port, int timeout, String name) {
        final ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + "-pool-%d").setPriority(Thread.NORM_PRIORITY).build();
        final ExecutorService connectors = new ThreadPoolExecutor(CONNECTORS_POOL_SIZE, CONNECTORS_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        final ExecutorService workers = new ThreadPoolExecutor(WORKERS_POOL_SIZE, WORKERS_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(connectors, workers, CONNECTORS_POOL_SIZE, WORKERS_POOL_SIZE));
        address = new InetSocketAddress(hostIp, NumberUtils.toInt(port));
        clientBootstrap.setOption("remoteAddress", address);
        clientBootstrap.setOption("connectTimeoutMillis", timeout);
        this.timeout = timeout;
        addShutdownHook();
      }
    
      @SneakyThrows
      public byte[] send(final byte[] data) {
        final SocketEventHandler socketEventHandler = new SocketEventHandler(timeout);
        final Channel channel = clientBootstrap.getFactory().newChannel(Channels.pipeline(socketEventHandler));
        final ChannelFuture future = channel.connect(address);
        future.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
              channel.write(ChannelBuffers.wrappedBuffer(data));
            } else {
              log.error("I/O operation has failed.", future.getCause());
              throw (Exception) future.getCause();
            }
          }
        });
        return socketEventHandler.getMessage();
      }
    
      private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            clientBootstrap.releaseExternalResources();
          }
        });
      }
    
      private class SocketEventHandler extends SimpleChannelUpstreamHandler {
    
        private byte[] message;
    
        private int timeout;
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        SocketEventHandler(int timeout) {
          this.timeout = timeout;
        }
    
        @SneakyThrows
        byte[] getMessage() {
          latch.await(timeout, TimeUnit.MILLISECONDS);
          return message;
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
          if (null != e.getMessage()) {
            message = ((ChannelBuffer) e.getMessage()).array();
            latch.countDown();
          }
          if (null != ctx.getChannel()) {
            ctx.getChannel().close();
          }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          if (null != ctx.getChannel()) {
            ctx.getChannel().close();
          }
          log.error("An exception was raised by an I/O thread.", e.getCause());
        }
    
      }
    
    }
  • 相关阅读:
    点击对应不同name的button,显示不同name的弹窗(弹窗功能)
    点击添加本地图片的前端效果制作
    巧用margin/padding的百分比值实现高度自适应(多用于占位,避免闪烁)
    移动端取消touch高亮效果
    手机网站的几点注意
    图片自动切换+链接
    使用DOM的方法获取所有li元素,然后使用jQuery()构造函数把它封装为jQuery对象
    使用jQuery匹配文档中所有的li元素,返回一个jQuery对象,然后通过数组下标的方式读取jQuery集合中第1个DOM元素,此时返回的是DOM对象,然后调用DOM属性innerHTML,读取该元素 包含的文本信息
    利用jQuery扩展接口为jQuery框架定义了两个自定义函数,然后调用这两个函数
    jQuery链式语法演示
  • 原文地址:https://www.cnblogs.com/frankyou/p/10593279.html
Copyright © 2011-2022 走看看