zoukankan      html  css  js  c++  java
  • Netty5 服务端和客户端-copy

    概述
    netty 5 已经放弃掉了,作为学习netty4和5的差别不大,本例子是基于netty5

    https://github.com/netty/netty/issues/4466

    线程安全
    一个thread + 队列 == 一个单线程线程池。线程安全的,任务是线性串行执行的

    线程安全,不会产生阻塞效应 ,使用对象组

    线程不安全,会产生阻塞效应, 使用对象池

    Figure 1. 对象池
    Figure 2. 对象组
    writeAndFlush 发送消息线程安全
    其中 EventExecutor executor 是 NioEventLoop 线程安全

    public final class NioEventLoop extends SingleThreadEventLoop
    代码
    Server
    /**
    * netty5服务端
    *
    *
    */
    public class Server {

    public static void main(String[] args) {
    //服务类
    ServerBootstrap bootstrap = new ServerBootstrap();

    //boss和worker
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup worker = new NioEventLoopGroup();

    try {
    //设置线程池
    bootstrap.group(boss, worker);

    //设置socket工厂、
    bootstrap.channel(NioServerSocketChannel.class);

    //设置管道工厂
    bootstrap.childHandler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ServerHandler());
    }
    });

    //netty3中对应设置如下
    //bootstrap.setOption("backlog", 1024);
    //bootstrap.setOption("tcpNoDelay", true);
    //bootstrap.setOption("keepAlive", true);
    //设置参数,TCP参数
    bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
    bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送

    //绑定端口
    ChannelFuture future = bootstrap.bind(10101);

    System.out.println("start");

    //等待服务端关闭
    future.channel().closeFuture().sync();

    } catch (Exception e) {
    e.printStackTrace();
    } finally{
    //释放资源
    boss.shutdownGracefully();
    worker.shutdownGracefully();
    }
    }
    }
    ServerHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
    * 服务端消息处理
    *
    *
    */
    public class ServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {

    System.out.println(msg);

    ctx.channel().writeAndFlush("hi");
    ctx.writeAndFlush("hi");
    }

    /**
    * 新客户端接入
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelActive");
    }

    /**
    * 客户端断开
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("channelInactive");
    }

    /**
    * 异常
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    }


    }
    Client
    import java.io.BufferedReader;
    import java.io.InputStreamReader;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;

    /**
    * netty5的客户端
    *
    *
    */
    public class Client {

    public static void main(String[] args) {
    //服务类
    Bootstrap bootstrap = new Bootstrap();

    //worker
    EventLoopGroup worker = new NioEventLoopGroup();

    try {
    //设置线程池
    bootstrap.group(worker);

    //设置socket工厂、
    bootstrap.channel(NioSocketChannel.class);

    //设置管道
    bootstrap.handler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ClientHandler());
    }
    });

    ChannelFuture connect = bootstrap.connect("127.0.0.1", 10101);

    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    while(true){
    System.out.println("请输入:");
    String msg = bufferedReader.readLine();
    connect.channel().writeAndFlush(msg);
    }

    } catch (Exception e) {
    e.printStackTrace();
    } finally{
    worker.shutdownGracefully();
    }
    }
    }
    ClientHandler
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    /**
    * 客户端消息处理
    *
    *
    */
    public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.println("客户端收到消息:"+msg);
    }

    }
    多连接客户端
    MultClient

    其中 synchronized(channel) 可以采用 单任务队列的方式

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;

    /**
    * 多连接客户端
    *
    *
    */
    public class MultClient {

    /**
    * 服务类
    */
    private Bootstrap bootstrap = new Bootstrap();

    /**
    * 会话
    */
    private List<Channel> channels = new ArrayList<>();

    /**
    * 引用计数
    */
    private final AtomicInteger index = new AtomicInteger();

    /**
    * 初始化
    * @param count
    */
    public void init(int count){

    //worker
    EventLoopGroup worker = new NioEventLoopGroup();

    //设置线程池
    bootstrap.group(worker);

    //设置socket工厂、
    bootstrap.channel(NioSocketChannel.class);

    //设置管道
    bootstrap.handler(new ChannelInitializer<Channel>() {

    @Override
    protected void initChannel(Channel ch) throws Exception {
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    ch.pipeline().addLast(new ClientHandler());
    }
    });

    for(int i=1; i<=count; i++){
    ChannelFuture future = bootstrap.connect("192.168.0.103", 10101);
    channels.add(future.channel());
    }
    }

    /**
    * 获取会话
    * @return
    */
    public Channel nextChannel(){
    return getFirstActiveChannel(0);
    }


    private Channel getFirstActiveChannel(int count){
    Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size()));
    if(!channel.isActive()){
    //重连
    reconnect(channel);
    if(count >= channels.size()){
    throw new RuntimeException("no can use channel");
    }
    return getFirstActiveChannel(count + 1);
    }
    return channel;
    }

    /**
    * 重连
    * @param channel
    */
    private void reconnect(Channel channel){
    synchronized(channel){
    if(channels.indexOf(channel) == -1){
    return ;
    }

    Channel newChannel = bootstrap.connect("192.168.0.103", 10101).channel();
    channels.set(channels.indexOf(channel), newChannel);
    }
    }

    }
    Start

    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    /**
    * 启动类
    *
    *
    */
    public class Start {

    public static void main(String[] args) {

    MultClient client = new MultClient();
    client.init(5);

    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
    while(true){
    try {
    System.out.println("请输入:");
    String msg = bufferedReader.readLine();
    client.nextChannel().writeAndFlush(msg);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    ————————————————
    版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_27384769/article/details/80674108

  • 相关阅读:
    Python 存储引擎 数据类型 主键
    Python 数据库
    Python 线程池进程池 异步回调 协程 IO模型
    Python GIL锁 死锁 递归锁 event事件 信号量
    Python 进程间通信 线程
    Python 计算机发展史 多道技术 进程 守护进程 孤儿和僵尸进程 互斥锁
    Python 异常及处理 文件上传事例 UDP socketserver模块
    Python socket 粘包问题 报头
    Django基础,Day7
    Django基础,Day6
  • 原文地址:https://www.cnblogs.com/hanease/p/14471639.html
Copyright © 2011-2022 走看看