zoukankan      html  css  js  c++  java
  • Disruptor与Netty实现百万级(十)

    实体对象:

    import java.io.Serializable;
    
    public class TranslatorData implements Serializable {
    	
    	private static final long serialVersionUID = 8763561286199081881L;
    
    	private String id;
    	private String name;
    	private String message;	//传输消息体内容
    	
    	public String getId() {
    		return id;
    	}
    	public void setId(String id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    }
    

      

    import com.bfxy.codec.MarshallingCodeCFactory;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.AdaptiveRecvByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class NettyServer {
    
    	public NettyServer() {
    		//1. 创建两个工作线程组: 一个用于接受网络请求的线程组. 另一个用于实际处理业务的线程组
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workGroup = new NioEventLoopGroup();
    
    		//2 辅助类
    		ServerBootstrap serverBootstrap = new ServerBootstrap();
    		try {
    			serverBootstrap.group(bossGroup, workGroup)
    			.channel(NioServerSocketChannel.class)
    			.option(ChannelOption.SO_BACKLOG, 1024)
    			//表示缓存区动态调配(自适应)
    			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
    			//缓存区 池化操作
    			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    			//日志
    			.handler(new LoggingHandler(LogLevel.INFO))
    			.childHandler(new ChannelInitializer<SocketChannel>() {
    				@Override
    				protected void initChannel(SocketChannel sc) throws Exception {
    					//向管道中添加拦截器
    					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    					sc.pipeline().addLast(new ServerHandler());
    				}
    			});
    			//绑定端口,同步等等请求连接
    			ChannelFuture cf = serverBootstrap.bind(8765).sync();
    			System.err.println("Server Startup...");
    			cf.channel().closeFuture().sync();
    		
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			//优雅停机
    			bossGroup.shutdownGracefully();
    			workGroup.shutdownGracefully();
    			System.err.println("Sever ShutDown...");
    		}
    	}	
    }
    

      

    import com.bfxy.disruptor.MessageProducer;
    import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
    import com.bfxy.entity.TranslatorData;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        	/**
        	TranslatorData request = (TranslatorData)msg;
        	System.err.println("Sever端: id= " + request.getId() 
        					+ ", name= " + request.getName() 
        					+ ", message= " + request.getMessage());
        	//数据库持久化操作 IO读写 ---> 交给一个线程池 去异步的调用执行
        	TranslatorData response = new TranslatorData();
        	response.setId("resp: " + request.getId());
        	response.setName("resp: " + request.getName());
        	response.setMessage("resp: " + request.getMessage());
        	//写出response响应信息:
        	ctx.writeAndFlush(response);
        	*/
        	TranslatorData request = (TranslatorData)msg;
        	//自已的应用服务应该有一个ID生成规则
        	String producerId = "code:sessionId:001";
        	MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
        	messageProducer.onData(request, ctx);
    
        }
    }
    

      

    import com.bfxy.disruptor.MessageConsumer;
    import com.bfxy.entity.TranslatorData;
    import com.bfxy.entity.TranslatorDataWapper;
    
    import io.netty.channel.ChannelHandlerContext;
    
    public class MessageConsumerImpl4Server extends MessageConsumer {
    
    	public MessageConsumerImpl4Server(String consumerId) {
    		super(consumerId);
    	}
    
    	public void onEvent(TranslatorDataWapper event) throws Exception {
    		TranslatorData request = event.getData();
    		ChannelHandlerContext ctx = event.getCtx();
    		//1.业务处理逻辑:
        	System.err.println("Sever端: id= " + request.getId() 
    		+ ", name= " + request.getName() 
    		+ ", message= " + request.getMessage());
        	
        	//2.回送响应信息:
        	TranslatorData response = new TranslatorData();
        	response.setId("resp: " + request.getId());
        	response.setName("resp: " + request.getName());
        	response.setMessage("resp: " + request.getMessage());
        	//写出response响应信息:
        	ctx.writeAndFlush(response);
    	}
    }
    

      客户端:

    import com.bfxy.codec.MarshallingCodeCFactory;
    import com.bfxy.entity.TranslatorData;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.AdaptiveRecvByteBufAllocator;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class NettyClient {
    
    	public static final String HOST = "127.0.0.1";
    	public static final int PORT = 8765;
    
    	
    	//扩展 完善 池化: ConcurrentHashMap<KEY -> String, Value -> Channel> 
    	private Channel channel;	
    	
    	//1. 创建工作线程组: 用于实际处理业务的线程组
    	private EventLoopGroup workGroup = new NioEventLoopGroup();
    	
    	private ChannelFuture cf;
    	
    	public NettyClient() {
    		this.connect(HOST, PORT);
    	}
    
    	private void connect(String host, int port) {
    		//2 辅助类(注意Client 和 Server 不一样)
    		Bootstrap bootstrap = new Bootstrap();
    		try {
    			
    			bootstrap.group(workGroup)
    			.channel(NioSocketChannel.class)
    			//表示缓存区动态调配(自适应)
    			.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)
    			//缓存区 池化操作
    			.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    			.handler(new LoggingHandler(LogLevel.INFO))
    			.handler(new ChannelInitializer<SocketChannel>() {
    				@Override
    				protected void initChannel(SocketChannel sc) throws Exception {
    					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    					sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    					sc.pipeline().addLast(new ClientHandler());
    				}
    			});
    			//绑定端口,同步等等请求连接
    			this.cf = bootstrap.connect(host, port).sync();
    			System.err.println("Client connected...");
    			
    			//接下来就进行数据的发送, 但是首先我们要获取channel:
    			this.channel = cf.channel();
    			
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    	//发送数据
    	public void sendData(){
    		for(int i =0; i <10; i++){
    			TranslatorData request = new TranslatorData();
    			request.setId("" + i);
    			request.setName("请求消息名称 " + i);
    			request.setMessage("请求消息内容 " + i);
    			this.channel.writeAndFlush(request);
    		}
    	}
    	
    	public void close() throws Exception {
    		cf.channel().closeFuture().sync();
    		//优雅停机
    		workGroup.shutdownGracefully();
    		System.err.println("Sever ShutDown...");		
    	}
    }
    

      

    vimport com.bfxy.disruptor.MessageProducer;
    import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
    import com.bfxy.entity.TranslatorData;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        	
        	/**
        	try {
        		TranslatorData response = (TranslatorData)msg;
        		System.err.println("Client端: id= " + response.getId() 
        				+ ", name= " + response.getName()
        				+ ", message= " + response.getMessage());
    		} finally {
    			//一定要注意 用完了缓存 要进行释放
    			ReferenceCountUtil.release(msg);
    		}
    		*/
        	TranslatorData response = (TranslatorData)msg;
        	String producerId = "code:seesionId:002";
        	MessageProducer messageProducer = RingBufferWorkerPoolFactory.getInstance().getMessageProducer(producerId);
        	messageProducer.onData(response, ctx);
        }
    }
    

      

    import com.bfxy.disruptor.MessageConsumer;
    import com.bfxy.entity.TranslatorData;
    import com.bfxy.entity.TranslatorDataWapper;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class MessageConsumerImpl4Client extends MessageConsumer {
    
    	public MessageConsumerImpl4Client(String consumerId) {
    		super(consumerId);
    	}
    
    	public void onEvent(TranslatorDataWapper event) throws Exception {
    		TranslatorData response = event.getData();
    		ChannelHandlerContext ctx = event.getCtx();
    		//业务逻辑处理:
    		try {
        		System.err.println("Client端: id= " + response.getId() 
    			+ ", name= " + response.getName()
    			+ ", message= " + response.getMessage());
    		} finally {
    			ReferenceCountUtil.release(response);
    		}
    	}
    }
    

      

     工厂类的封装:

    import java.io.Serializable;
    
    public class TranslatorData implements Serializable {
    	
    	private static final long serialVersionUID = 8763561286199081881L;
    
    	private String id;
    	private String name;
    	private String message;	//传输消息体内容
    	
    	public String getId() {
    		return id;
    	}
    	public void setId(String id) {
    		this.id = id;
    	}
    	public String getName() {
    		return name;
    	}
    	public void setName(String name) {
    		this.name = name;
    	}
    	public String getMessage() {
    		return message;
    	}
    	public void setMessage(String message) {
    		this.message = message;
    	}
    }
    

      

    import io.netty.channel.ChannelHandlerContext;
    //dis内部需要传输的对象
    public class TranslatorDataWapper {
         //实际的数据
    	private TranslatorData data;
    	//ctx对象
    	private ChannelHandlerContext ctx;
    
    	public TranslatorData getData() {
    		return data;
    	}
    
    	public void setData(TranslatorData data) {
    		this.data = data;
    	}
    
    	public ChannelHandlerContext getCtx() {
    		return ctx;
    	}
    
    	public void setCtx(ChannelHandlerContext ctx) {
    		this.ctx = ctx;
    	}
    }
    

      

    import com.bfxy.entity.TranslatorDataWapper;
    import com.lmax.disruptor.WorkHandler;
    //抽象的让子类实现
    public abstract class MessageConsumer implements WorkHandler<TranslatorDataWapper> {
    
    	protected String consumerId;
    	
    	public MessageConsumer(String consumerId) {
    		this.consumerId = consumerId;
    	}
    
    	public String getConsumerId() {
    		return consumerId;
    	}
    
    	public void setConsumerId(String consumerId) {
    		this.consumerId = consumerId;
    	}
    }
    

      

    import com.bfxy.entity.TranslatorData;
    import com.bfxy.entity.TranslatorDataWapper;
    import com.lmax.disruptor.RingBuffer;
    
    import io.netty.channel.ChannelHandlerContext;
    
    //生产者
    public class MessageProducer {
    
    	private String producerId;
    	
    	private RingBuffer<TranslatorDataWapper> ringBuffer;
    	
    	public MessageProducer(String producerId, RingBuffer<TranslatorDataWapper> ringBuffer) {
    		this.producerId = producerId;
    		this.ringBuffer = ringBuffer;
    	}
    	//发送实际的对象和ctx
    	public void onData(TranslatorData data, ChannelHandlerContext ctx) {
    		long sequence = ringBuffer.next();
    		try {
    			TranslatorDataWapper wapper = ringBuffer.get(sequence);
    			wapper.setData(data);
    			wapper.setCtx(ctx);
    		} finally {
    			ringBuffer.publish(sequence);
    		}
    	}	
    }
    

      

    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Executors;
    import com.bfxy.entity.TranslatorDataWapper;
    import com.lmax.disruptor.EventFactory;
    import com.lmax.disruptor.ExceptionHandler;
    import com.lmax.disruptor.RingBuffer;
    import com.lmax.disruptor.SequenceBarrier;
    import com.lmax.disruptor.WaitStrategy;
    import com.lmax.disruptor.WorkerPool;
    import com.lmax.disruptor.dsl.ProducerType;
    
    //环形缓存工作池子工厂
    public class RingBufferWorkerPoolFactory {
        //静态内部类的单例模式
    	private static class SingletonHolder {
    		static final RingBufferWorkerPoolFactory instance = new RingBufferWorkerPoolFactory();
    	}
    	//对外不能暴露的接口
    	private RingBufferWorkerPoolFactory(){
    		
    	}
    	//对外创建
    	public static RingBufferWorkerPoolFactory getInstance() {
    		return SingletonHolder.instance;
    	}
    	
        //生产者池
    	private static Map<String, MessageProducer> producers = new ConcurrentHashMap<String, MessageProducer>();
    	//消费者池
    	private static Map<String, MessageConsumer> consumers = new ConcurrentHashMap<String, MessageConsumer>();
    
    	private RingBuffer<TranslatorDataWapper> ringBuffer;
    
    	private SequenceBarrier sequenceBarrier;
    	
    	private WorkerPool<TranslatorDataWapper> workerPool;
    	
    	//ProducerType  生产者类型是多生产还是单生产
    	public void initAndStart(ProducerType type, int bufferSize, WaitStrategy waitStrategy, MessageConsumer[] messageConsumers) {
    		//1. 构建ringBuffer对象
    		this.ringBuffer = RingBuffer.create(type,
    				new EventFactory<TranslatorDataWapper>() {
    					public TranslatorDataWapper newInstance() {
    						return new TranslatorDataWapper();
    					}
    				},
    				bufferSize,
    				waitStrategy);
    		//2.设置序号栅栏
    		this.sequenceBarrier = this.ringBuffer.newBarrier();
    
    		//3.设置工作池
    		this.workerPool = new WorkerPool<TranslatorDataWapper>(
    				this.ringBuffer,
    				this.sequenceBarrier, 
    				new EventExceptionHandler(), messageConsumers);
    		
    		//4 把所构建的消费者置入池中
    		for(MessageConsumer mc : messageConsumers){
    			this.consumers.put(mc.getConsumerId(), mc);
    		}
    		
    		//5 添加我们的sequences
    		this.ringBuffer.addGatingSequences(this.workerPool.getWorkerSequences());
    		
    		//6 启动我们的工作池
    		this.workerPool.start(
    				Executors.newFixedThreadPool
    				(Runtime.getRuntime().availableProcessors()/2));
    	}
    	//生产者
    	public MessageProducer getMessageProducer(String producerId){
    		MessageProducer messageProducer = this.producers.get(
    				producerId);
    		if(null == messageProducer) {
    			messageProducer = new MessageProducer(producerId, this.ringBuffer);
    			this.producers.put(producerId, messageProducer);
    		}
    		return messageProducer;
    	}
    
    	/**
    	 * 异常静态类
    	 */
    	static class EventExceptionHandler implements ExceptionHandler<TranslatorDataWapper> {
    		public void handleEventException(Throwable ex, long sequence, TranslatorDataWapper event) {
    		}
    
    		public void handleOnStartException(Throwable ex) {
    		}
    
    		public void handleOnShutdownException(Throwable ex) {
    		}
    	}
    }
    

      

    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    /**
     * Marshalling工厂
     */
    public final class MarshallingCodeCFactory {
    
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
        	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		//创建了MarshallingConfiguration对象,配置了版本号为5 
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		//根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
    		return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
    		MarshallingEncoder encoder = new MarshallingEncoder(provider);
    		return encoder;
        }
    }
    

      生产者的逻辑:

    import com.bfxy.disruptor.MessageConsumer;
    import com.bfxy.entity.TranslatorData;
    import com.bfxy.entity.TranslatorDataWapper;
    import io.netty.channel.ChannelHandlerContext;
    public class MessageConsumerImpl4Server extends MessageConsumer {
    
    	public MessageConsumerImpl4Server(String consumerId) {
    		super(consumerId);
    	}
       
    	public void onEvent(TranslatorDataWapper event) throws Exception {
    		TranslatorData request = event.getData();
    		ChannelHandlerContext ctx = event.getCtx();
    		//1.业务处理逻辑:
        	System.err.println("Sever端: id= " + request.getId() 
    		+ ", name= " + request.getName() 
    		+ ", message= " + request.getMessage());
        	
        	//2.回送响应信息:
        	TranslatorData response = new TranslatorData();
        	response.setId("resp: " + request.getId());
        	response.setName("resp: " + request.getName());
        	response.setMessage("resp: " + request.getMessage());
        	//写出response响应信息:
        	ctx.writeAndFlush(response);
    	}
    }
    

      消费者的逻辑:

    import com.bfxy.disruptor.MessageConsumer;
    import com.bfxy.entity.TranslatorData;
    import com.bfxy.entity.TranslatorDataWapper;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class MessageConsumerImpl4Client extends MessageConsumer {
    
    	public MessageConsumerImpl4Client(String consumerId) {
    		super(consumerId);
    	}
    
    	public void onEvent(TranslatorDataWapper event) throws Exception {
    		TranslatorData response = event.getData();
    		ChannelHandlerContext ctx = event.getCtx();
    		//业务逻辑处理:
    		try {
        		System.err.println("Client端: id= " + response.getId() 
    			+ ", name= " + response.getName()
    			+ ", message= " + response.getMessage());
    		} finally {
    			ReferenceCountUtil.release(response);
    		}
    	}
    }
    

      

    import com.lmax.disruptor.dsl.ProducerType;
    
    @SpringBootApplication
    public class NettyServerApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(NettyServerApplication.class, args);
    
    		MessageConsumer[] conusmers = new MessageConsumer[4];
    		for(int i =0; i < conusmers.length; i++) {
    			MessageConsumer messageConsumer = new MessageConsumerImpl4Server("code:serverId:" + i);
    			conusmers[i] = messageConsumer;
    		}
    		RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
    				1024*1024,
    				//new YieldingWaitStrategy(),
    				new BlockingWaitStrategy(),
    				conusmers);
    		
    		new NettyServer();
    	}
    }
    

      

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import com.bfxy.client.MessageConsumerImpl4Client;
    import com.bfxy.client.NettyClient;
    import com.bfxy.disruptor.MessageConsumer;
    import com.bfxy.disruptor.RingBufferWorkerPoolFactory;
    import com.lmax.disruptor.BlockingWaitStrategy;
    import com.lmax.disruptor.YieldingWaitStrategy;
    import com.lmax.disruptor.dsl.ProducerType;
    
    @SpringBootApplication
    public class NettyClientApplication {
    
    	public static void main(String[] args) {
    		SpringApplication.run(NettyClientApplication.class, args);
    		
    		MessageConsumer[] conusmers = new MessageConsumer[4];
    		for(int i =0; i < conusmers.length; i++) {
    			MessageConsumer messageConsumer = new MessageConsumerImpl4Client("code:clientId:" + i);
    			conusmers[i] = messageConsumer;
    		}
    		RingBufferWorkerPoolFactory.getInstance().initAndStart(ProducerType.MULTI,
    				1024*1024,
    				//new YieldingWaitStrategy(),
    				new BlockingWaitStrategy(),
    				conusmers);
    		
    		//建立连接 并发送消息
    		new NettyClient().sendData();
    	}
    }
    

      

  • 相关阅读:
    etcd的原理分析
    (转)Linux sort命令
    随机森林
    python 类的定义和继承
    python random
    Spark源码阅读(1): Stage划分
    Mac 上安装MySQL
    Python 删除 数组
    在循环中将多列数组组合成大数组
    准确率 召回率
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11006162.html
Copyright © 2011-2022 走看看