zoukankan      html  css  js  c++  java
  • netty做客户端实战(二)

      和上一篇相比,此项目的场景有所不同:需要采集机房的电表、温湿度、水浸和烟感的数据,首先通过通讯管理机先将数据直采,然后通过服务器采集程序采集通讯管理机上存储的数据并解析入库。

      先说说这2个有什么不同,项目1中户外设备主动连接服务器,并定时通过指定端口发送报文,而此项目需要服务器去主动连接设备发送报文采集数据,仅仅需要开发netty的client端。

      项目框架:springboot+netty+mybatis+lombok+logback

      开发环境:idea2018+jdk1.8+mysql5.6.35+maven3.5.3

      项目搭建:

      1.快速搭建springboot项目,配置pom.xml文件依赖包

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>2.1.2</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <!--  netty依赖 springboot2.0自动导入版本-->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
            </dependency>
            <!--邮件依赖包-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-mail</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--编写更少量的代码:使用apache commons工具类库:
            https://www.cnblogs.com/ITtangtang/p/3966955.html-->
            <!--apache.commons.lang3-->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>
            <!--apache.codec:编码方法的工具类包
            https://blog.csdn.net/u012881904/article/details/52767853-->
            <dependency>
                <groupId>commons-codec</groupId>
                <artifactId>commons-codec</artifactId>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    2.自定义netty的client端BootNettyClient类

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    @Component
    @Slf4j
    public class BootNettyClient {
        @Autowired
        BootNioChannelInitializer bootNioChannelInitializer;
        @Value("${netty.port}")
        private Integer  port;
        @Value("#{'${netty.host}'.split(',')}")
        private List<String> hosts;
    
        private List<Channel> channels=null;
    
        private Bootstrap bootstrap;
            //定义线程组,处理读写和连接事件,没有了accept事件
             private EventLoopGroup workGroup = new NioEventLoopGroup();
            public void start() throws Exception {
                    bootstrap = new Bootstrap();
                    bootstrap.group(workGroup);
                    //绑定客户端通道
                    bootstrap.channel(NioSocketChannel.class);
                    //给NioSocketChannel初始化handler,处理读写事件
                    bootstrap.handler(bootNioChannelInitializer);
                    System.out.println("开始启动----");
                    for(int i =0;i<hosts.size();i++){
                        if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(hosts.get(i)))){
                            doConnect(hosts.get(i),port);
                        }else{
                            continue;
                        }
                    }
            }
            //发起连接
            protected void doConnect(String host,int port) {
                    ChannelFuture future = bootstrap.connect(host, port);
                    future.addListener(new ChannelFutureListener() {
                        public void operationComplete(ChannelFuture futureListener) throws Exception {
                            if (futureListener.isSuccess()) {
                                channels.add(futureListener.channel());
                                log.info(host+"Connect to server successfully!");
                            } else {
                                log.info(host+"Failed to connect to server, try connect after 10s");
                                futureListener.channel().eventLoop().schedule(new Runnable() {
                                    @Override
                                    public void run() {
                                        log.info(host+"重新连接----");
                                        doConnect(host,port);//递归doConnect方法,进行断线重连
                                    }
                                }, 10, TimeUnit.SECONDS);
                            }
                        }
                    });
                }
    }

    3.自定义初始化类BootNioChannelInitializer类

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class BootNioChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
        @Autowired
        BootNettyClientHandler bootNettyClientHandler;
        @Autowired
        MyDecoder myDecoder;
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast("idleStateHandler",new IdleStateHandler(15,0,15, TimeUnit.SECONDS));//注意new IdleStateHandler的作用
            ch.pipeline().addLast("decoder", myDecoder);
            //找到他的管道 增加他的handler
            ch.pipeline().addLast(bootNettyClientHandler);
            System.out.println("初始化信道");
        }
    }

    4.自定义业务处理类BootNettyClientHandler类import com.rtst.dhjclistener.util.StringUitls;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;

    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.net.InetSocketAddress;
    import java.text.DecimalFormat;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    @Component
    @Slf4j
    @ChannelHandler.Sharable
    public class BootNettyClientHandler extends ChannelInboundHandlerAdapter {//  将当前客户端连接 存入map   实现控制设备下发 参数
        public  static Map<String, Channel> ctxMap = new LinkedHashMap<String, Channel>();
    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就不存 }else{//否则就将当前的设备ip+端口存进map 当做下发设备的标识的key ctxMap.put(clientIp, ctx.channel()); } log.info(clientIp+"------连接成功-------"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ SocketChannel channel = (SocketChannel) ctx.channel(); ByteBuf buff = Unpooled.buffer();//netty需要用ByteBuf传输 //将字符串转成每两个字符加空格形式的字符串 String regex = "(.{2})"; String input = msg.toString().replaceAll(regex, "$1 "); log.info(channel.remoteAddress().getHostString() + ": " + input); System.out.println("服务端接受信息为: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + input); byte[] bytes = StringUitls.toByteArray(msg.toString()); Map<String,Object> params = new LinkedHashMap<>(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址
         //业务处理逻辑,解析报文并入库,如果有告警,发送邮件提示用户机房存在告警
          ......
    } /** * 连接断开时进入该方法 * @param ctx */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端断开连接---channelInactive"); log.info("客户端断开连接---channelInactive"); super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); bootNettyClient.doConnect(clientIp,port);//断线重连 } /** * 出现异常时进入该方法 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if(ctxMap.get(clientIp)!=null){//如果不为空就删除 ctxMap.remove(clientIp, ctx.channel()); } ctx.close(); } //处理超时读写空闲事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("触发读写空闲操作-----"); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress();//获取现连接的IP地址 if (evt instanceof IdleStateEvent){ IdleStateEvent idleStateEvent = (IdleStateEvent) evt ; log.info(clientIp+"触发"+idleStateEvent.state()+"事件"); //获取IdleStateEvent事件,根据状态是否为读状态空闲 if (idleStateEvent.state() == IdleState.READER_IDLE){ log.info("已经 好长时间没有收到信息!"); System.out.println("尝试再次发送命令"); //向下位机发送消息 ByteBuf buf =Unpooled.buffer(); String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";//可以设置成读取更大地址的数据,比如读0-500的地址位:00 00 00 00 00 06 c8 03 00 00 01 F4 05 c2 byte[] msg = StringUitls.hexStrToBinaryStr(order); buf.writeBytes(msg); ctx.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); triggeredNum++; if(triggeredNum>=3){ ctx.close(); triggeredNum=0; } } } super.userEventTriggered(ctx, evt); } }

    5.application类实现CommandLineRunner,来启动nettyClient服务

    package com.rtst.dhjclistener;
    
    import com.rtst.dhjclistener.nettyclient.BootNettyClient;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    @MapperScan("com.rtst.dhjclistener.repository")
    @EnableScheduling
    public class DhjclistenerApplication implements CommandLineRunner {
        @Autowired
        BootNettyClient bootNettyClient;
        public static void main(String[] args) {
            SpringApplication.run(DhjclistenerApplication.class, args);
        }
        @Override
        public void run(String... args) throws Exception {
            /**
             * 启动netty服务端服务
             */
            bootNettyClient.start();
        }
    }

    6.定时任务发送报文到下位机,请求下位机采集存储的数据

    package com.rtst.dhjclistener.ordertask;
    
    
    import com.rtst.dhjclistener.nettyclient.BootNettyClientHandler;
    import com.rtst.dhjclistener.util.StringUitls;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StringUtils;
    
    import java.util.List;
    
    @Component
    @Configuration
    public class MyTask {
    @Value("#{'${netty.host}'.split(',')}")
    private List<String> host;
        @Scheduled(cron = "*/20 * * * * ?")//每20秒执行一次发送命令,此处根据自己实际需求设置时间
        public void order() {
            ByteBuf buf = Unpooled.buffer();
            String order = "00 00 00 00 00 06 c8 03 00 00 00 64 05 c2";
            byte[] msg = StringUitls.hexStrToBinaryStr(order);
            buf.writeBytes(msg);
            buf.retain(1);//在同时采集2个通讯管理机时会报异常,此行代码可以解决,如果只是采集一个通讯管理机时是不会存在该异常的
            for(int i=0;i<host.size();i++){
                if(StringUtils.isEmpty(BootNettyClientHandler.ctxMap.get(host.get(i)))){
                        System.out.println(host.get(i)+"channel对象为空");
                        continue;
                    }else{
                        BootNettyClientHandler.ctxMap.get(host.get(i)).writeAndFlush(buf);
                    }
            }
            System.out.println("发送命令成功");
            }
    }

      注意点:1.断线重连

          2.多个通讯管理机定时任务发送报文时,存在异常。解决方法:buf.retain()的作用

    时光静好,与君语;细水长流,与君同;繁华落尽,与君老!
  • 相关阅读:
    2.12 使用@DataProvider
    2.11 webdriver中使用 FileUtils ()
    Xcode8 添加PCH文件
    The app icon set "AppIcon" has an unassigned child告警
    Launch Image
    iOS App图标和启动画面尺寸
    iPhone屏幕尺寸、分辨率及适配
    Xcode下载失败 使用已购项目页面再试一次
    could not find developer disk image
    NSDate与 NSString 、long long类型的相互转化
  • 原文地址:https://www.cnblogs.com/lyzj/p/13283107.html
Copyright © 2011-2022 走看看