zoukankan      html  css  js  c++  java
  • netty做客户端实战(二)

      和上一篇相比,此项目的场景有所不同:需要采集机房的电表、温湿度、水浸和烟感的数据,首先通过通讯管理机先将数据直采,然后通过服务器采集程序采集通讯管理机上存储的数据并解析入库。

      先说说这2个有什么不同,项目1中户外设备主动连接服务器,并定时通过指定端口发送报文,而此项目需要服务器去主动连接设备发送报文采集数据,仅仅需要开发netty的client端。

      项目框架:springboot+netty+mybatis+lombok+logback

      开发环境:idea2018+jdk1.8+mysql5.6.35+maven3.5.3

      项目搭建:

      1.快速搭建springboot项目,配置pom.xml文件依赖包

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>2.1.2</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <!--  netty依赖 springboot2.0自动导入版本-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
            </dependency>
            <!--邮件依赖包-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-mail</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--编写更少量的代码:使用apache commons工具类库:
            https://www.cnblogs.com/ITtangtang/p/3966955.html-->
            <!--apache.commons.lang3-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>
            <!--apache.codec:编码方法的工具类包
            https://blog.csdn.net/u012881904/article/details/52767853-->
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    2.自定义netty的client端BootNettyClient类

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    @Component
    @Slf4j
    public class BootNettyClient {
        @Autowired
        BootNioChannelInitializer bootNioChannelInitializer;
        @Value("${netty.port}")
        private Integer  port;
        @Value("#{'${netty.host}'.split(',')}")
        private List<String> hosts;
    
        private List<Channel> channels=null;
    
        private Bootstrap bootstrap;
            //定义线程组,处理读写和连接事件,没有了accept事件
             private EventLoopGroup workGroup = new NioEventLoopGroup();
            public void start() throws Exception {
                    bootstrap = new Bootstrap();
                    bootstrap.group(workGroup);
                    //绑定客户端通道
                    bootstrap.channel(NioSocketChannel.class);
                    //给NioSocketChannel初始化handler,处理读写事件
                    bootstrap.handler(bootNioChannelInitializer);
                    System.out.println("开始启动----");
                    for(int i =0;i<hosts.size();i++){
                        if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(hosts.get(i)))){
                            doConnect(hosts.get(i),port);
                        }else{
                            continue;
                        }
                    }
            }
            //发起连接
            protected void doConnect(String host,int port) {
                    ChannelFuture future = bootstrap.connect(host, port);
                    future.addListener(new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture futureListener) throws Exception {
                            if (futureListener.isSuccess()) {
                                channels.add(futureListener.channel());
                                log.info(host+"Connect to server successfully!");
                            } else {
                                log.info(host+"Failed to connect to server, try connect after 10s");
                                futureListener.channel().eventLoop().schedule(new Runnable() {
                                    @Override
                                    public void run() {
                                        log.info(host+"重新连接----");
                                        doConnect(host,port);//递归doConnect方法,进行断线重连
                                    }
                                }, 10, TimeUnit.SECONDS);
                            }
                        }
                    });
                }
    }

    3.自定义初始化类BootNioChannelInitializer类

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class BootNioChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
        @Autowired
        BootNettyClientHandler bootNettyClientHandler;
        @Autowired
        MyDecoder myDecoder;
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast("idleStateHandler",new IdleStateHandler(15,0,15, TimeUnit.SECONDS));//注意new IdleStateHandler的作用
            ch.pipeline().addLast("decoder", myDecoder);
            //找到他的管道 增加他的handler
            ch.pipeline().addLast(bootNettyClientHandler);
            System.out.println("初始化信道");
        }
    }

    4.自定义业务处理类BootNettyClientHandler类import com.rtst.dhjclistener.util.StringUitls;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;

    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.text.DecimalFormat;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    @Component
    @Slf4j
    @ChannelHandler.Sharable
    public class BootNettyClientHandler extends ChannelInboundHandlerAdapter {//  将当前客户端连接 存入map   实现控制设备下发 参数
        public  static Map<String, Channel> ctxMap = new LinkedHashMap<String, Channel>();
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就不存 }else{//否则就将当前的设备ip+端口存进map 当做下发设备的标识的key ctxMap.put(clientIp, ctx.channel()); } log.info(clientIp+"------连接成功-------"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ SocketChannel channel = (SocketChannel) ctx.channel(); ByteBuf buff = Unpooled.buffer();//netty需要用ByteBuf传输 //将字符串转成每两个字符加空格形式的字符串 String regex = "(.{2})"; String input = msg.toString().replaceAll(regex, "$1 "); log.info(channel.remoteAddress().getHostString() + ": " + input); System.out.println("服务端接受信息为: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + input); byte[] bytes = StringUitls.toByteArray(msg.toString()); Map<String,Object> params = new LinkedHashMap<>(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
         //业务处理逻辑,解析报文并入库,如果有告警,发送邮件提示用户机房存在告警
          ......
    } /** * 连接断开时进入该方法 * @param ctx */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开连接---channelInactive"); log.info("客户端断开连接---channelInactive"); super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); bootNettyClient.doConnect(clientIp,port);//断线重连 } /** * 出现异常时进入该方法 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); } //处理超时读写空闲事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("触发读写空闲操作-----"); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; log.info(clientIp+"触发"+idleStateEvent.state()+"事件"); //获取IdleStateEvent事件,根据状态是否为读状态空闲 if (idleStateEvent.state() == IdleState.READER_IDLE){ log.info("已经 好长时间没有收到信息!"); System.out.println("尝试再次发送命令"); //向下位机发送消息 ByteBuf buf =Unpooled.buffer(); String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";//可以设置成读取更大地址的数据,比如读0-500的地址位:00 00 00 00 00 06 c8 03 00 00 01 F4 05 c2 byte[] msg = StringUitls.hexStrToBinaryStr(order); buf.writeBytes(msg); ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); triggeredNum++; if(triggeredNum>=3){ ctx.close(); triggeredNum=0; } } } super.userEventTriggered(ctx, evt); } }

    5.application类实现CommandLineRunner,来启动nettyClient服务

    package com.rtst.dhjclistener;
    
    import com.rtst.dhjclistener.nettyclient.BootNettyClient;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    @MapperScan("com.rtst.dhjclistener.repository")
    @EnableScheduling
    public class DhjclistenerApplication implements CommandLineRunner {
        @Autowired
        BootNettyClient bootNettyClient;
        public static void main(String[] args) {
            SpringApplication.run(DhjclistenerApplication.class, args);
        }
        @Override
        public void run(String... args) throws Exception {
            /**
             * 启动netty服务端服务
             */
            bootNettyClient.start();
        }
    }

    6.定时任务发送报文到下位机,请求下位机采集存储的数据

    package com.rtst.dhjclistener.ordertask;
    
    
    import com.rtst.dhjclistener.nettyclient.BootNettyClientHandler;
    import com.rtst.dhjclistener.util.StringUitls;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    import java.util.List;
    
    @Component
    @Configuration
    public class MyTask {
    @Value("#{'${netty.host}'.split(',')}")
    private List<String> host;
        @Scheduled(cron = "*/20 * * * * ?")//每20秒执行一次发送命令,此处根据自己实际需求设置时间
        public void order() {
            ByteBuf buf = Unpooled.buffer();
            String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";
            byte[] msg = StringUitls.hexStrToBinaryStr(order);
            buf.writeBytes(msg);
            buf.retain(1);//在同时采集2个通讯管理机时会报异常,此行代码可以解决,如果只是采集一个通讯管理机时是不会存在该异常的
            for(int i=0;i<host.size();i++){
                if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(host.get(i)))){
                        System.out.println(host.get(i)+"channel对象为空");
                        continue;
                    }else{
                        BootNettyClientHandler.ctxMap.get(host.get(i)).writeAndFlush(buf);
                    }
            }
            System.out.println("发送命令成功");
            }
    }

      注意点:1.断线重连

          2.多个通讯管理机定时任务发送报文时,存在异常。解决方法:buf.retain()的作用

    时光静好,与君语;细水长流,与君同;繁华落尽,与君老!
  • 相关阅读:
    24种设计模式之适配器模式
    内存分配与回收策略
    java 吞吐量
    JVM运行数据区
    垃圾收集算法学习
    对象的回收
    未来一段时间学习方向
    多线程并发容器
    python基础数据类型--list列表
    Sublime Text 快捷键
  • 原文地址:https://www.cnblogs.com/lyzj/p/13283107.html
Copyright © 2011-2022 走看看