zoukankan      html  css  js  c++  java
  • Netty实现的一个异步Socket代码

     本人写的一个使用Netty实现的一个异步Socket代码

    package test.core.nio;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import java.net.InetSocketAddress;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.math.NumberUtils;
    import org.jboss.netty.bootstrap.ClientBootstrap;
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.buffer.ChannelBuffers;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.MessageEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
    
    /**
     * @author xfyou
     * @date 2019/3/21
     */
    @Slf4j
    public class AsyncSocket {
    
      private final ClientBootstrap clientBootstrap;
    
      private final InetSocketAddress address;
    
      private int timeout;
    
      private static final byte CONNECTORS_POOL_SIZE = 1;
    
      private static final byte WORKERS_POOL_SIZE = 30;
    
      public AsyncSocket(String hostIp, String port, int timeout, String name) {
        final ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + "-pool-%d").setPriority(Thread.NORM_PRIORITY).build();
        final ExecutorService connectors = new ThreadPoolExecutor(CONNECTORS_POOL_SIZE, CONNECTORS_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        final ExecutorService workers = new ThreadPoolExecutor(WORKERS_POOL_SIZE, WORKERS_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(connectors, workers, CONNECTORS_POOL_SIZE, WORKERS_POOL_SIZE));
        address = new InetSocketAddress(hostIp, NumberUtils.toInt(port));
        clientBootstrap.setOption("remoteAddress", address);
        clientBootstrap.setOption("connectTimeoutMillis", timeout);
        this.timeout = timeout;
        addShutdownHook();
      }
    
      @SneakyThrows
      public byte[] send(final byte[] data) {
        final SocketEventHandler socketEventHandler = new SocketEventHandler(timeout);
        final Channel channel = clientBootstrap.getFactory().newChannel(Channels.pipeline(socketEventHandler));
        final ChannelFuture future = channel.connect(address);
        future.addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
              channel.write(ChannelBuffers.wrappedBuffer(data));
            } else {
              log.error("I/O operation has failed.", future.getCause());
              throw (Exception) future.getCause();
            }
          }
        });
        return socketEventHandler.getMessage();
      }
    
      private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
          @Override
          public void run() {
            clientBootstrap.releaseExternalResources();
          }
        });
      }
    
      private class SocketEventHandler extends SimpleChannelUpstreamHandler {
    
        private byte[] message;
    
        private int timeout;
    
        private final CountDownLatch latch = new CountDownLatch(1);
    
        SocketEventHandler(int timeout) {
          this.timeout = timeout;
        }
    
        @SneakyThrows
        byte[] getMessage() {
          latch.await(timeout, TimeUnit.MILLISECONDS);
          return message;
        }
    
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
          if (null != e.getMessage()) {
            message = ((ChannelBuffer) e.getMessage()).array();
            latch.countDown();
          }
          if (null != ctx.getChannel()) {
            ctx.getChannel().close();
          }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
          if (null != ctx.getChannel()) {
            ctx.getChannel().close();
          }
          log.error("An exception was raised by an I/O thread.", e.getCause());
        }
    
      }
    
    }
  • 相关阅读:
    C#如何连接wifi和指定IP
    3.4 小结
    3.3.4.5 起始与清除
    3.3.4.4 打印行
    3.3.4.3 设置字段分隔字符
    3.3.4.2 字段
    3.3.4.1 模式与操作
    3.3.4 使用 awk 重新编排字段
    3.3.3 使用 join 连接字段
    3.3.2 使用 cut 选定字段
  • 原文地址:https://www.cnblogs.com/frankyou/p/10593279.html
Copyright © 2011-2022 走看看