zoukankan      html  css  js  c++  java
  • Netty5 服务端和客户端-copy

    概述
    netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5

    https://github.com/netty/netty/issues/4466

    线程安全
    一个thread + 队列 == 一个单线程线程池。线程安全的,任务是线性串行执行的

    线程安全,不会产生阻塞效应 ,使用对象组

    线程不安全,会产生阻塞效应, 使用对象池

    Figure 1. 对象池
    Figure 2. 对象组
    writeAndFlush 发送消息线程安全
    其中 EventExecutor executor 是 NioEventLoop 线程安全

    public final class NioEventLoop extends SingleThreadEventLoop
    代码
    Server
    /**
    * netty5服务端
    *
    *
    */
    public class Server {

    public static void main(String[] args) {
    //服务类
    ServerBootstrap bootstrap = new ServerBootstrap();

    //boss和worker
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup worker = new NioEventLoopGroup();

    try {
    //设置线程池
    bootstrap.group(boss, worker);

    //设置socket工厂、
    bootstrap.channel(NioServerSocketChannel.class);

    //设置管道工厂
    bootstrap.childHandler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ServerHandler());
    }
    });

    //netty3中对应设置如下
    //bootstrap.setOption("backlog", 1024);
    //bootstrap.setOption("tcpNoDelay", true);
    //bootstrap.setOption("keepAlive", true);
    //设置参数,TCP参数
    bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
    bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送

    //绑定端口
    ChannelFuture future = bootstrap.bind(10101);

    System.out.println("start");

    //等待服务端关闭
    future.channel().closeFuture().sync();

    } catch (Exception e) {
    e.printStackTrace();
    } finally{
    //释放资源
    boss.shutdownGracefully();
    worker.shutdownGracefully();
    }
    }
    }
    ServerHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
    * 服务端消息处理
    *
    *
    */
    public class ServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {

    System.out.println(msg);

    ctx.channel().writeAndFlush("hi");
    ctx.writeAndFlush("hi");
    }

    /**
    * 新客户端接入
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelActive");
    }

    /**
    * 客户端断开
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelInactive");
    }

    /**
    * 异常
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    }


    }
    Client
    import java.io.BufferedReader;
    import java.io.InputStreamReader;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;

    /**
    * netty5的客户端
    *
    *
    */
    public class Client {

    public static void main(String[] args) {
    //服务类
    Bootstrap bootstrap = new Bootstrap();

    //worker
    EventLoopGroup worker = new NioEventLoopGroup();

    try {
    //设置线程池
    bootstrap.group(worker);

    //设置socket工厂、
    bootstrap.channel(NioSocketChannel.class);

    //设置管道
    bootstrap.handler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ClientHandler());
    }
    });

    ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101);

    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    while(true){
    System.out.println("请输入:");
    String msg = bufferedReader.readLine();
    connect.channel().writeAndFlush(msg);
    }

    } catch (Exception e) {
    e.printStackTrace();
    } finally{
    worker.shutdownGracefully();
    }
    }
    }
    ClientHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
    * 客户端消息处理
    *
    *
    */
    public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("客户端收到消息:"+msg);
    }

    }
    多连接客户端
    MultClient

    其中 synchronized(channel) 可以采用 单任务队列的方式

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;

    /**
    * 多连接客户端
    *
    *
    */
    public class MultClient {

    /**
    * 服务类
    */
    private Bootstrap bootstrap = new Bootstrap();

    /**
    * 会话
    */
    private List<Channel> channels = new ArrayList<>();

    /**
    * 引用计数
    */
    private final AtomicInteger index = new AtomicInteger();

    /**
    * 初始化
    * @param count
    */
    public void init(int count){

    //worker
    EventLoopGroup worker = new NioEventLoopGroup();

    //设置线程池
    bootstrap.group(worker);

    //设置socket工厂、
    bootstrap.channel(NioSocketChannel.class);

    //设置管道
    bootstrap.handler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ClientHandler());
    }
    });

    for(int i=1; i<=count; i++){
    ChannelFuture future = bootstrap.connect("192.168.0.103", 10101);
    channels.add(future.channel());
    }
    }

    /**
    * 获取会话
    * @return
    */
    public Channel nextChannel(){
    return getFirstActiveChannel(0);
    }


    private Channel getFirstActiveChannel(int count){
    Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
    if(!channel.isActive()){
    //重连
    reconnect(channel);
    if(count >= channels.size()){
    throw new RuntimeException("no can use channel");
    }
    return getFirstActiveChannel(count + 1);
    }
    return channel;
    }

    /**
    * 重连
    * @param channel
    */
    private void reconnect(Channel channel){
    synchronized(channel){
    if(channels.indexOf(channel) == -1){
    return ;
    }

    Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel();
    channels.set(channels.indexOf(channel), newChannel);
    }
    }

    }
    Start

    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    /**
    * 启动类
    *
    *
    */
    public class Start {

    public static void main(String[] args) {

    MultClient client = new MultClient();
    client.init(5);

    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    while(true){
    try {
    System.out.println("请输入:");
    String msg = bufferedReader.readLine();
    client.nextChannel().writeAndFlush(msg);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    ————————————————
    版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_27384769/article/details/80674108

  • 相关阅读:
    AGC 018E.Sightseeing Plan(组合 DP)
    BZOJ.4767.两双手(组合 容斥 DP)
    AGC 001E.BBQ Hard(组合 DP)
    洛谷.3960.列队(线段树/树状数组)
    Codeforces Round #514 (Div. 2)
    10.4 正睿国庆集训测试 青岛
    Codeforces.264E.Roadside Trees(线段树 DP LIS)
    BZOJ.4653.[NOI2016]区间(线段树)
    Ansible安装部署以及常用模块详解
    Linux系统诊断必备技能之二:tcpdump抓包工具详解
  • 原文地址:https://www.cnblogs.com/hanease/p/14471639.html
Copyright © 2011-2022 走看看