关于最近我所学的netty内容,在语雀上
https://www.yuque.com/u492757/psgrno/yl91yg
Netty tcp在netty中的使用
MyServer
package com.zhetang.netty; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/19 * Time: 20:12 * Description: No Description */ @Slf4j @Component public class MyServer { // @Value("${my.server.port}") // private static Integer port; @Resource private BaseUserMapper userMapper; private static final Integer portt =8082; public static void init(){ //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置服务端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //使用匿名内部类的形式初始化通道对象 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置处理器 socketChannel.pipeline().addFirst(new MyServerValidateHandler()); socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 log.info("实时数据采集服务端准备就绪!!!!!!"); //绑定端口号,启动服务端 ChannelFuture channelFuture = null; try { channelFuture = bootstrap.bind(portt).sync(); } catch (InterruptedException e) { e.printStackTrace(); } //对关闭通道进行监听 try { channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置服务端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //使用匿名内部类的形式初始化通道对象 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置处理器 socketChannel.pipeline().addFirst(new MyServerValidateHandler()); socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 log.info("实时数据采集服务端准备就绪!!!!!!"); //绑定端口号,启动服务端 ChannelFuture channelFuture = bootstrap.bind(portt).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerHandler
package com.zhetang.netty; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zhetang.config.QueueConfig; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import com.zhetang.model.bo.realtime.*; import com.zhetang.model.mysql.lvms.BaseUser; import com.zhetang.model.pojo.DataResult; import com.zhetang.model.pojo.ErrorResult; import com.zhetang.rabbitmq.RabbitMqSender; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.PostConstruct; import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.security.interfaces.RSAPrivateKey; import java.security.spec.InvalidKeySpecException; import java.util.List; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/19 * Time: 20:13 * Description: No Description */ /** * 自定义的Handler需要继承Netty规定好的HandlerAdapter * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式 **/ @Component @Slf4j public class MyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private BaseUserMapper userMapper; private static MyServerHandler myServerHandler; @Autowired private RabbitMqSender rabbitMqSender; @Autowired private QueueConfig queueConfig; @PostConstruct public void init() { myServerHandler = this; myServerHandler.userMapper = this.userMapper; myServerHandler.rabbitMqSender =this.rabbitMqSender; myServerHandler.queueConfig = this.queueConfig; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { BaseUserMapper userMapper = myServerHandler.userMapper; RabbitMqSender rabbitMqSender = myServerHandler.rabbitMqSender; //获取客户端发送过来的消息 FirmCheckDataBO data = null; FirmCheckDataBO checkData = null; String returnClientJson = null; String decodeData = null; ByteBuf byteBuf = (ByteBuf) msg; try { String json = byteBuf.toString(CharsetUtil.UTF_8); FirmCheckDataBO checkDataBO = JSONObject.parseObject(json, FirmCheckDataBO.class); checkData = verifyCheckData(checkDataBO); decodeData = getDecodeData(checkDataBO, userMapper); log.info("解密后:"+decodeData); }catch (RuntimeException e){ e.printStackTrace(); DataResult dataResult = new DataResult(false, checkData.getDataId(), new ErrorResult("未授权的访问")); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("认证token有误!!!!!!"); } try{ //验证企业数据是否已填 returnClientJson = sendQueueByType(checkData, decodeData,rabbitMqSender); DataResult dataResult = new DataResult(true, checkData.getDataId()); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); }catch (RuntimeException e) { e.printStackTrace(); if (e.getCause() != null) { log.error(e.getCause().getCause().toString()); DataResult dataResult = new DataResult(false, checkData.getDataId(),new ErrorResult(e.getCause().getCause().toString())); // ctx.writeAndFlush(Unpooled.copiedBuffer( JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer( JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("字段未填!!!!!!"); }else { DataResult dataResult = new DataResult(false, checkData.getDataId(),new ErrorResult(e.toString())); ctx.writeAndFlush(Unpooled.copiedBuffer( JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("dataType传值有误!!!!!!"); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送消息给客户端 // ctx.writeAndFlush(Unpooled.copiedBuffer("@@", CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("@@", Charset.forName("GBK"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 log.error("ctx检测到"+ctx.channel().remoteAddress()+"异常!!!!!!",cause); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("ctx检测到"+ctx.channel().remoteAddress()+"已断线!!!!!!"); super.channelInactive(ctx); } FirmCheckDataBO verifyCheckData(@Validated FirmCheckDataBO firmCheckDataBO){ FirmCheckDataBO checkDataBO = new FirmCheckDataBO(); BeanUtils.copyProperties(firmCheckDataBO,checkDataBO); return checkDataBO; } /** * 校验 安全监测点位、工艺监测DCS监测点位实时数据 * @param dcsDetectRealTimeBO * @return */ DcsDetectRealTimeBO verifyDcsDetectRealTimeData(@Validated DcsDetectRealTimeBO dcsDetectRealTimeBO){ DcsDetectRealTimeBO detectRealTimeBO = new DcsDetectRealTimeBO(); BeanUtils.copyProperties(dcsDetectRealTimeBO,detectRealTimeBO); return detectRealTimeBO; } /** * 校验 安全监测点位、生产⼯艺(治污工艺)DCS监测点位报警数据 * @param dcsDetectAlarmDataBO * @return */ DcsDetectAlarmDataBO verifyDcsDetectAlarmData(@Validated DcsDetectAlarmDataBO dcsDetectAlarmDataBO){ DcsDetectAlarmDataBO detectAlarmDataBO = new DcsDetectAlarmDataBO(); BeanUtils.copyProperties(dcsDetectAlarmDataBO,detectAlarmDataBO); return detectAlarmDataBO; } /** * 校验 消防点位报警数据 * @param firePointAlarmPointBO * @return */ FirePointAlarmPointBO verifyFirePoint(@Validated FirePointAlarmPointBO firePointAlarmPointBO){ FirePointAlarmPointBO firePointAlarmPoint = new FirePointAlarmPointBO(); BeanUtils.copyProperties(firePointAlarmPointBO,firePointAlarmPoint); return firePointAlarmPoint; } /** * 校验 能源消耗数据 * @param energyExpendDataBO * @return */ EnergyExpendDataBO verifyEnergyExpendData(@Validated EnergyExpendDataBO energyExpendDataBO){ EnergyExpendDataBO energyExpendData = new EnergyExpendDataBO(); BeanUtils.copyProperties(energyExpendDataBO,energyExpendData); return energyExpendData; } /** * 校验 企业车辆出入 * @param enterpriseCarComeRealTimeBO * @return */ EnterpriseCarComeRealTimeBO verifyEnterpriseCarRealTimeData(@Validated EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO){ EnterpriseCarComeRealTimeBO carComeRealTimeBO = new EnterpriseCarComeRealTimeBO(); BeanUtils.copyProperties(enterpriseCarComeRealTimeBO,carComeRealTimeBO); return carComeRealTimeBO; } /** * 校验 二道门人员出入 * @param twoDoorsInOutBO * @return */ TwoDoorsInOutBO verifyTwoDoorsInOut(@Validated TwoDoorsInOutBO twoDoorsInOutBO){ TwoDoorsInOutBO twoDoorsInOut = new TwoDoorsInOutBO(); BeanUtils.copyProperties(twoDoorsInOutBO,twoDoorsInOut); return twoDoorsInOut; } /** * 校验 二道门车间人员统计 * @param twoDoorsWorkShopTotalBO * @return */ TwoDoorsWorkShopTotalBO verifyTwoDoorsWorkshop(@Validated TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotalBO){ TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal = new TwoDoorsWorkShopTotalBO(); BeanUtils.copyProperties(twoDoorsWorkShopTotalBO,twoDoorsWorkShopTotal); return twoDoorsWorkShopTotal; } /** * 校验 二道门报警 * @param electricRailBO * @return */ ElectricRailBO verifyTwoDoorsAlarm(@Validated ElectricRailBO electricRailBO){ ElectricRailBO twoDoorsAlarm = new ElectricRailBO(); BeanUtils.copyProperties(electricRailBO,twoDoorsAlarm); return twoDoorsAlarm; } /** * 校验 人员求助报警 * @param personPositionAlarmBO * @return */ PersonPositionAlarmBO verifyPersonPosition(@Validated PersonPositionAlarmBO personPositionAlarmBO){ PersonPositionAlarmBO personPositionAlarm = new PersonPositionAlarmBO(); BeanUtils.copyProperties(personPositionAlarmBO,personPositionAlarm); return personPositionAlarm; } /** * 校验 电子围栏 * @param electricRailBO * @return */ ElectricRailBO verifyElectricRail(@Validated ElectricRailBO electricRailBO){ ElectricRailBO electricRail = new ElectricRailBO(); BeanUtils.copyProperties(electricRailBO,electricRail); return electricRail; } /** *得到解密数据 * @param firmCheckDataBO * @param userMapper * @return */ public String getDecodeData(FirmCheckDataBO firmCheckDataBO,BaseUserMapper userMapper) { String cipherText = ""; BaseUser baseUser = userMapper.selectByPrimaryKey(firmCheckDataBO.getAppid()); RSAPrivateKey privateKey = null; try { privateKey = RSAUtil.getPrivateKey(baseUser.getRsaPrivateKey()); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (InvalidKeySpecException e) { e.printStackTrace(); } //解密得到desKey String desKey = RSAUtil.privateDecrypt(firmCheckDataBO.getPublicKey(), privateKey); System.out.println("deskey:"+desKey); byte[] bytes = new byte[0]; try { bytes = RSAUtil.decryptBASE64(firmCheckDataBO.getEncryptData()); } catch (Exception e) { e.printStackTrace(); } //des解密 byte[] dataByte = DESUtil.desDecrypt(bytes, desKey); cipherText = new String(dataByte); return cipherText; } /** * 根据type值的不同,发往不同的队列 * @param checkData 基本业务字段 * @param decodeData 解密后data json */ String sendQueueByType(FirmCheckDataBO checkData,String decodeData,RabbitMqSender rabbitMqSender){ String returnClientJson = null; if(checkData != null && checkData.getDataType() != null){ switch (checkData.getDataType()){ case "Am0": // DcsDetectRealTimeBO dcsDetectRealTimeBO = JSON.parseObject(decodeData, DcsDetectRealTimeBO.class); List<DcsDetectRealTimeBO> dcsDetectRealTimeBOS = JSON.parseArray(decodeData, DcsDetectRealTimeBO.class); if(dcsDetectRealTimeBOS!= null&&dcsDetectRealTimeBOS.size()>0){ for (DcsDetectRealTimeBO dcsDetectRealTimeBO: dcsDetectRealTimeBOS) { DcsDetectRealTimeBO dcsDetectRealTime = verifyDcsDetectRealTimeData(dcsDetectRealTimeBO); returnClientJson = JSONArray.toJSONString(dcsDetectRealTime); rabbitMqSender.sendDetectRealTime(returnClientJson); } } break; case "Aa0": // DcsDetectAlarmDataBO dcsDetectAlarmDataBO = JSON.parseObject(decodeData, DcsDetectAlarmDataBO.class); List<DcsDetectAlarmDataBO> dcsDetectAlarmDataBOs = JSON.parseArray(decodeData, DcsDetectAlarmDataBO.class); if(dcsDetectAlarmDataBOs != null && dcsDetectAlarmDataBOs.size()>0){ for (DcsDetectAlarmDataBO dcsDetectAlarmDataBO: dcsDetectAlarmDataBOs) { DcsDetectAlarmDataBO dcsDetectAlarmData = verifyDcsDetectAlarmData(dcsDetectAlarmDataBO); returnClientJson = JSONArray.toJSONString(dcsDetectAlarmData); rabbitMqSender.sendDetectAlarm(returnClientJson); } } break; case "Xa0": // FirePointAlarmPointBO firePointAlarmPoint = JSON.parseObject(decodeData, FirePointAlarmPointBO.class); List<FirePointAlarmPointBO> firePointAlarmPoints = JSON.parseArray(decodeData, FirePointAlarmPointBO.class); if(firePointAlarmPoints != null && firePointAlarmPoints.size()>0){ for (FirePointAlarmPointBO firePointAlarmPoint: firePointAlarmPoints) { FirePointAlarmPointBO firePointAlarmPointBO = verifyFirePoint(firePointAlarmPoint); returnClientJson = JSONArray.toJSONString(firePointAlarmPointBO); rabbitMqSender.sendFirePoint(returnClientJson); } } break; case "Nm0": // EnergyExpendDataBO energyExpendData = JSON.parseObject(decodeData, EnergyExpendDataBO.class); List<EnergyExpendDataBO> energyExpendDatas = JSON.parseArray(decodeData, EnergyExpendDataBO.class); if(energyExpendDatas != null && energyExpendDatas.size()>0){ if(energyExpendDatas!=null &&energyExpendDatas.size()>0){ for (EnergyExpendDataBO energyExpendData: energyExpendDatas) { EnergyExpendDataBO energyExpendDataBO = verifyEnergyExpendData(energyExpendData); returnClientJson = JSONArray.toJSONString(energyExpendDataBO); rabbitMqSender.sendEnergyExpend(returnClientJson); } } } break; case "Fm2": // EnterpriseOnDutyBO enterpriseOnDuty = JSON.parseObject(decodeData, EnterpriseOnDutyBO.class); List<EnterpriseCarComeRealTimeBO> enterpriseCarComeRealTimeBOS = JSON.parseArray(decodeData, EnterpriseCarComeRealTimeBO.class); if(enterpriseCarComeRealTimeBOS != null && enterpriseCarComeRealTimeBOS.size()>0){ for (EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO: enterpriseCarComeRealTimeBOS) { EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO1 = verifyEnterpriseCarRealTimeData(enterpriseCarComeRealTimeBO); returnClientJson = JSONArray.toJSONString(enterpriseCarComeRealTimeBO1); rabbitMqSender.sendEnterpriseOnDuty(returnClientJson); } } break; case "Fm1": // TwoDoorsInOutBO twoDoorsInOut = JSON.parseObject(decodeData, TwoDoorsInOutBO.class); List<TwoDoorsInOutBO> twoDoorsInOuts = JSON.parseArray(decodeData, TwoDoorsInOutBO.class); if(twoDoorsInOuts != null && twoDoorsInOuts.size()>0){ for (TwoDoorsInOutBO twoDoorsInOut: twoDoorsInOuts) { TwoDoorsInOutBO twoDoorsInOutBO = verifyTwoDoorsInOut(twoDoorsInOut); returnClientJson = JSONArray.toJSONString(twoDoorsInOutBO); rabbitMqSender.sendTwoDoorsInout(returnClientJson); } } break; case "Fm0": // TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal = JSON.parseObject(decodeData, TwoDoorsWorkShopTotalBO.class); List<TwoDoorsWorkShopTotalBO> twoDoorsWorkShopTotals = JSON.parseArray(decodeData, TwoDoorsWorkShopTotalBO.class); if(twoDoorsWorkShopTotals!= null && twoDoorsWorkShopTotals.size()>0){ for (TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal: twoDoorsWorkShopTotals) { TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotalBO = verifyTwoDoorsWorkshop(twoDoorsWorkShopTotal); returnClientJson = JSONArray.toJSONString(twoDoorsWorkShopTotalBO); rabbitMqSender.sendTwoDoorsWorkshop(returnClientJson); } } break; case "Fa3": // TwoDoorsAlarmBO twoDoorsAlarm = JSON.parseObject(decodeData, TwoDoorsAlarmBO.class); List<PersonPositionAlarmBO> personPositionAlarmBOS = JSON.parseArray(decodeData, PersonPositionAlarmBO.class); if(personPositionAlarmBOS!= null && personPositionAlarmBOS.size()>0){ for (PersonPositionAlarmBO personPositionAlarmBO: personPositionAlarmBOS) { PersonPositionAlarmBO personPositionAlarmBO1 = verifyPersonPosition(personPositionAlarmBO); returnClientJson = JSONArray.toJSONString(personPositionAlarmBO1); rabbitMqSender.sendTwoDoorsAlarm(returnClientJson); } } break; case "Fa4": // TwoDoorsAlarmBO twoDoorsAlarm = JSON.parseObject(decodeData, TwoDoorsAlarmBO.class); List<ElectricRailBO> electricRailBOS = JSON.parseArray(decodeData, ElectricRailBO.class); if(electricRailBOS!= null && electricRailBOS.size()>0){ for (ElectricRailBO electricRailBO: electricRailBOS) { ElectricRailBO electricRail = verifyElectricRail(electricRailBO); returnClientJson = JSONArray.toJSONString(electricRail); rabbitMqSender.sendElectronicRail(returnClientJson); } } break; default: throw new RuntimeException("dataType传值有误"); } } return returnClientJson; } }
MyServerValidateHandler
package com.zhetang.netty; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhetang.config.QueueConfig; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import com.zhetang.model.bo.realtime.FirmCheckDataBO; import com.zhetang.model.mysql.lvms.BaseUser; import com.zhetang.model.pojo.DataResult; import com.zhetang.model.pojo.ErrorResult; import com.zhetang.rabbitmq.RabbitMqSender; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.PostConstruct; import java.nio.charset.Charset; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/26 * Time: 15:35 * Description: No Description */ @Component @Slf4j public class MyServerValidateHandler extends ChannelInboundHandlerAdapter { @Autowired private BaseUserMapper userMapper; private static MyServerValidateHandler myServerValidateHandler; @PostConstruct public void init() { myServerValidateHandler = this; myServerValidateHandler.userMapper = this.userMapper; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FirmCheckDataBO checkData = null; FirmCheckDataBO checkDataBO = null; DataResult dataResult = null; BaseUserMapper userMapper = myServerValidateHandler.userMapper; ByteBuf byteBuf = (ByteBuf) msg; String json = byteBuf.toString(CharsetUtil.UTF_8); log.info(ctx.channel().remoteAddress()+"发送消息:"+byteBuf.toString(CharsetUtil.UTF_8)); try{ checkDataBO = JSONObject.parseObject(json, FirmCheckDataBO.class); checkData = verifyCheckData(checkDataBO); }catch (RuntimeException e){ if (e.getCause() != null) { log.error(e.getCause().getCause().toString()); e.printStackTrace(); if(checkData != null|| checkDataBO != null){ dataResult = new DataResult(false, checkDataBO.getDataId(),new ErrorResult(e.getCause().getCause().toString())); } else { dataResult = new DataResult(false, "dataId不能为空",new ErrorResult(e.getCause().getCause().toString())); } // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("字段未填,连接失败!!!"); } } try{ BaseUser baseUser = userMapper.selectByPrimaryKey(checkData.getAppid()); if(baseUser == null){ throw new RuntimeException("用户不存在"); } ctx.fireChannelRead(msg); }catch (RuntimeException e){ e.printStackTrace(); if(checkData != null){ dataResult = new DataResult(false, checkData.getDataId(), new ErrorResult("未授权的访问")); }else { dataResult = new DataResult(false, "dataId不能为空", new ErrorResult("未授权的访问")); } // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("认证token有误!!!!!!"); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送消息给客户端 // ctx.writeAndFlush(Unpooled.copiedBuffer("@@", CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("@@", Charset.forName("GBK"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 log.error("ctx检测到"+ctx.channel().remoteAddress()+"异常!!!!!!",cause); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("ctx检测到"+ctx.channel().remoteAddress()+"已断线!!!!!!"); super.channelInactive(ctx); } FirmCheckDataBO verifyCheckData(@Validated FirmCheckDataBO firmCheckDataBO){ FirmCheckDataBO checkDataBO = new FirmCheckDataBO(); BeanUtils.copyProperties(firmCheckDataBO,checkDataBO); return checkDataBO; } }
Application
package com.zhetang; import com.alibaba.fastjson.serializer.SerializerFeature; import com.alibaba.fastjson.support.config.FastJsonConfig; import com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter; import com.zhetang.netty.MyServer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.http.HttpMessageConverters; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling @ConfigurationProperties public class GetsqlserverApplication { public static void main(String[] args) { SpringApplication.run(GetsqlserverApplication.class, args); MyServer.init(); } @Bean public HttpMessageConverters fastJsonHttpMessageConverters() { FastJsonHttpMessageConverter fastConverter = new FastJsonHttpMessageConverter(); FastJsonConfig fastJsonConfig = new FastJsonConfig(); fastJsonConfig.setSerializerFeatures(SerializerFeature.PrettyFormat); fastConverter.setFastJsonConfig(fastJsonConfig); HttpMessageConverter<?> converter = fastConverter; return new HttpMessageConverters(converter); } }
利用分隔符@@避免粘包半包现象
myServer
package com.zhetang.netty; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/19 * Time: 20:12 * Description: No Description */ @Slf4j @Component public class MyServer { // @Value("${my.server.port}") // private static Integer port; @Resource private BaseUserMapper userMapper; private static final Integer portt =8082; public static void init(){ //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置服务端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //使用匿名内部类的形式初始化通道对象 // .childHandler(new ChannelInitializer<SocketChannel>() { // @Override // protected void initChannel(SocketChannel socketChannel) throws Exception { // 给pipeline管道设置处理器 // socketChannel.pipeline().addFirst(new MyServerValidateHandler()); // socketChannel.pipeline().addLast(new MyServerHandler()); // // } // });//给workerGroup的EventLoop对应的管道设置处理器 .childHandler(new SocketServerInitializer()); log.info("实时数据采集服务端准备就绪!!!!!!"); //绑定端口号,启动服务端 ChannelFuture channelFuture = null; try { channelFuture = bootstrap.bind(portt).sync(); } catch (InterruptedException e) { e.printStackTrace(); } //对关闭通道进行监听 try { channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { //创建两个线程组 boosGroup、workerGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //创建服务端的启动对象,设置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //设置两个线程组boosGroup和workerGroup bootstrap.group(bossGroup, workerGroup) //设置服务端通道实现类型 .channel(NioServerSocketChannel.class) //设置线程队列得到连接个数 .option(ChannelOption.SO_BACKLOG, 128) //设置保持活动连接状态 .childOption(ChannelOption.SO_KEEPALIVE, true) //使用匿名内部类的形式初始化通道对象 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //给pipeline管道设置处理器 socketChannel.pipeline().addFirst(new MyServerValidateHandler()); socketChannel.pipeline().addLast(new MyServerHandler()); } });//给workerGroup的EventLoop对应的管道设置处理器 log.info("实时数据采集服务端准备就绪!!!!!!"); //绑定端口号,启动服务端 ChannelFuture channelFuture = bootstrap.bind(portt).sync(); //对关闭通道进行监听 channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
SocketServerInitializer
package com.zhetang.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/5/8 * Time: 18:20 * Description: No Description */ public class SocketServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 告诉DelimiterBasedFrameDecoder以$_作为分隔符 ByteBuf delimiter = Unpooled.copiedBuffer("@@".getBytes()); ChannelPipeline pipeline = socketChannel.pipeline(); //这里的1024,表示单条消息的最大长度,当达到长度后,还没有找到分隔符,则抛出TooLongFrameException // pipeline.addLast(new LengthFieldBasedFrameDecoder(60*1024,0,2)); // pipeline.addLast(new DelimiterBasedFrameDecoder(10*1024,delimiter)); pipeline.addLast(new DelimiterBasedFrameDecoder(1024*1024*8,delimiter)); pipeline.addLast(new MyServerValidateHandler()); pipeline.addLast(new MyServerHandler()); } }
MyServerValidateHandler
package com.zhetang.netty; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhetang.config.QueueConfig; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import com.zhetang.model.bo.realtime.FirmCheckDataBO; import com.zhetang.model.mysql.lvms.BaseUser; import com.zhetang.model.pojo.DataResult; import com.zhetang.model.pojo.ErrorResult; import com.zhetang.rabbitmq.RabbitMqSender; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.PostConstruct; import java.nio.charset.Charset; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/26 * Time: 15:35 * Description: No Description */ @Component @Slf4j public class MyServerValidateHandler extends ChannelInboundHandlerAdapter { @Autowired private BaseUserMapper userMapper; private static MyServerValidateHandler myServerValidateHandler; @PostConstruct public void init() { myServerValidateHandler = this; myServerValidateHandler.userMapper = this.userMapper; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FirmCheckDataBO checkData = null; FirmCheckDataBO checkDataBO = null; DataResult dataResult = null; BaseUserMapper userMapper = myServerValidateHandler.userMapper; ByteBuf byteBuf = (ByteBuf) msg; String json = byteBuf.toString(CharsetUtil.UTF_8); log.info(ctx.channel().remoteAddress()+"发送消息:"+byteBuf.toString(CharsetUtil.UTF_8)); try{ checkDataBO = JSONObject.parseObject(json, FirmCheckDataBO.class); checkData = verifyCheckData(checkDataBO); }catch (RuntimeException e){ if (e.getCause() != null) { log.error(e.getCause().getCause().toString()); e.printStackTrace(); if(checkData != null|| checkDataBO != null){ dataResult = new DataResult(false, checkDataBO.getDataId(),new ErrorResult(e.getCause().getCause().toString())); } else { dataResult = new DataResult(false, "dataId不能为空",new ErrorResult(e.getCause().getCause().toString())); } ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("字段未填,连接失败!!!"); } } try{ BaseUser baseUser = userMapper.selectByPrimaryKey(checkData.getAppid()); if(baseUser == null){ throw new RuntimeException("用户不存在"); } ctx.fireChannelRead(msg); }catch (RuntimeException e){ e.printStackTrace(); if(checkData != null){ dataResult = new DataResult(false, checkData.getDataId(), new ErrorResult("未授权的访问")); }else { dataResult = new DataResult(false, "dataId不能为空", new ErrorResult("未授权的访问")); } ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("认证token有误!!!!!!"); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送消息给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("@@", CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer("@@", Charset.forName("GBK"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 log.error("ctx检测到"+ctx.channel().remoteAddress()+"异常!!!!!!",cause); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("ctx检测到"+ctx.channel().remoteAddress()+"已断线!!!!!!"); super.channelInactive(ctx); } FirmCheckDataBO verifyCheckData(@Validated FirmCheckDataBO firmCheckDataBO){ FirmCheckDataBO checkDataBO = new FirmCheckDataBO(); BeanUtils.copyProperties(firmCheckDataBO,checkDataBO); return checkDataBO; } }
MyServerHandler
package com.zhetang.netty; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.zhetang.config.QueueConfig; import com.zhetang.mapper.mysqlMapper.BaseUserMapper; import com.zhetang.model.bo.realtime.*; import com.zhetang.model.mysql.lvms.BaseUser; import com.zhetang.model.pojo.DataResult; import com.zhetang.model.pojo.ErrorResult; import com.zhetang.rabbitmq.RabbitMqSender; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; import javax.annotation.PostConstruct; import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.security.interfaces.RSAPrivateKey; import java.security.spec.InvalidKeySpecException; import java.util.List; /** * Created with IntelliJ IDEA. * User:wq * Date:2021/4/19 * Time: 20:13 * Description: No Description */ /** * 自定义的Handler需要继承Netty规定好的HandlerAdapter * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式 **/ @Component @Slf4j public class MyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private BaseUserMapper userMapper; private static MyServerHandler myServerHandler; @Autowired private RabbitMqSender rabbitMqSender; @Autowired private QueueConfig queueConfig; @PostConstruct public void init() { myServerHandler = this; myServerHandler.userMapper = this.userMapper; myServerHandler.rabbitMqSender =this.rabbitMqSender; myServerHandler.queueConfig = this.queueConfig; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { BaseUserMapper userMapper = myServerHandler.userMapper; RabbitMqSender rabbitMqSender = myServerHandler.rabbitMqSender; //获取客户端发送过来的消息 FirmCheckDataBO data = null; FirmCheckDataBO checkData = null; String returnClientJson = null; String decodeData = null; ByteBuf byteBuf = (ByteBuf) msg; try { String json = byteBuf.toString(CharsetUtil.UTF_8); FirmCheckDataBO checkDataBO = JSONObject.parseObject(json, FirmCheckDataBO.class); checkData = verifyCheckData(checkDataBO); decodeData = getDecodeData(checkDataBO, userMapper); log.info("解密后:"+decodeData); }catch (RuntimeException e){ e.printStackTrace(); DataResult dataResult = new DataResult(false, checkData.getDataId(), new ErrorResult("未授权的访问")); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); //ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("认证token有误!!!!!!"); } try{ //验证企业数据是否已填 returnClientJson = sendQueueByType(checkData, decodeData,rabbitMqSender); DataResult dataResult = new DataResult(true, checkData.getDataId()); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), Charset.forName("GBK"))); }catch (RuntimeException e) { e.printStackTrace(); if (e.getCause() != null) { log.error(e.getCause().getCause().toString()); DataResult dataResult = new DataResult(false, checkData.getDataId(),new ErrorResult(e.getCause().getCause().toString())); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer( JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("字段未填!!!!!!"); }else { DataResult dataResult = new DataResult(false, checkData.getDataId(),new ErrorResult(e.toString())); ctx.writeAndFlush(Unpooled.copiedBuffer(JSON.toJSONString(dataResult), CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer( JSON.toJSONString(dataResult), Charset.forName("GBK"))); throw new RuntimeException("dataType传值有误!!!!!!"); } } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //发送消息给客户端 ctx.writeAndFlush(Unpooled.copiedBuffer("@@", CharsetUtil.UTF_8)); // ctx.writeAndFlush(Unpooled.copiedBuffer("@@", Charset.forName("GBK"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //发生异常,关闭通道 log.error("ctx检测到"+ctx.channel().remoteAddress()+"异常!!!!!!",cause); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("ctx检测到"+ctx.channel().remoteAddress()+"已断线!!!!!!"); super.channelInactive(ctx); } FirmCheckDataBO verifyCheckData(@Validated FirmCheckDataBO firmCheckDataBO){ FirmCheckDataBO checkDataBO = new FirmCheckDataBO(); BeanUtils.copyProperties(firmCheckDataBO,checkDataBO); return checkDataBO; } /** * 校验 安全监测点位、工艺监测DCS监测点位实时数据 * @param dcsDetectRealTimeBO * @return */ DcsDetectRealTimeBO verifyDcsDetectRealTimeData(@Validated DcsDetectRealTimeBO dcsDetectRealTimeBO){ DcsDetectRealTimeBO detectRealTimeBO = new DcsDetectRealTimeBO(); BeanUtils.copyProperties(dcsDetectRealTimeBO,detectRealTimeBO); return detectRealTimeBO; } /** * 校验 安全监测点位、生产⼯艺(治污工艺)DCS监测点位报警数据 * @param dcsDetectAlarmDataBO * @return */ DcsDetectAlarmDataBO verifyDcsDetectAlarmData(@Validated DcsDetectAlarmDataBO dcsDetectAlarmDataBO){ DcsDetectAlarmDataBO detectAlarmDataBO = new DcsDetectAlarmDataBO(); BeanUtils.copyProperties(dcsDetectAlarmDataBO,detectAlarmDataBO); return detectAlarmDataBO; } /** * 校验 消防点位报警数据 * @param firePointAlarmPointBO * @return */ FirePointAlarmPointBO verifyFirePoint(@Validated FirePointAlarmPointBO firePointAlarmPointBO){ FirePointAlarmPointBO firePointAlarmPoint = new FirePointAlarmPointBO(); BeanUtils.copyProperties(firePointAlarmPointBO,firePointAlarmPoint); return firePointAlarmPoint; } /** * 校验 能源消耗数据 * @param energyExpendDataBO * @return */ EnergyExpendDataBO verifyEnergyExpendData(@Validated EnergyExpendDataBO energyExpendDataBO){ EnergyExpendDataBO energyExpendData = new EnergyExpendDataBO(); BeanUtils.copyProperties(energyExpendDataBO,energyExpendData); return energyExpendData; } /** * 校验 企业车辆出入 * @param enterpriseCarComeRealTimeBO * @return */ EnterpriseCarComeRealTimeBO verifyEnterpriseCarRealTimeData(@Validated EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO){ EnterpriseCarComeRealTimeBO carComeRealTimeBO = new EnterpriseCarComeRealTimeBO(); BeanUtils.copyProperties(enterpriseCarComeRealTimeBO,carComeRealTimeBO); return carComeRealTimeBO; } /** * 校验 二道门人员出入 * @param twoDoorsInOutBO * @return */ TwoDoorsInOutBO verifyTwoDoorsInOut(@Validated TwoDoorsInOutBO twoDoorsInOutBO){ TwoDoorsInOutBO twoDoorsInOut = new TwoDoorsInOutBO(); BeanUtils.copyProperties(twoDoorsInOutBO,twoDoorsInOut); return twoDoorsInOut; } /** * 校验 二道门车间人员统计 * @param twoDoorsWorkShopTotalBO * @return */ TwoDoorsWorkShopTotalBO verifyTwoDoorsWorkshop(@Validated TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotalBO){ TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal = new TwoDoorsWorkShopTotalBO(); BeanUtils.copyProperties(twoDoorsWorkShopTotalBO,twoDoorsWorkShopTotal); return twoDoorsWorkShopTotal; } /** * 校验 二道门报警 * @param electricRailBO * @return */ ElectricRailBO verifyTwoDoorsAlarm(@Validated ElectricRailBO electricRailBO){ ElectricRailBO twoDoorsAlarm = new ElectricRailBO(); BeanUtils.copyProperties(electricRailBO,twoDoorsAlarm); return twoDoorsAlarm; } /** * 校验 人员求助报警 * @param personPositionAlarmBO * @return */ PersonPositionAlarmBO verifyPersonPosition(@Validated PersonPositionAlarmBO personPositionAlarmBO){ PersonPositionAlarmBO personPositionAlarm = new PersonPositionAlarmBO(); BeanUtils.copyProperties(personPositionAlarmBO,personPositionAlarm); return personPositionAlarm; } /** * 校验 电子围栏 * @param electricRailBO * @return */ ElectricRailBO verifyElectricRail(@Validated ElectricRailBO electricRailBO){ ElectricRailBO electricRail = new ElectricRailBO(); BeanUtils.copyProperties(electricRailBO,electricRail); return electricRail; } /** *得到解密数据 * @param firmCheckDataBO * @param userMapper * @return */ public String getDecodeData(FirmCheckDataBO firmCheckDataBO,BaseUserMapper userMapper) { String cipherText = ""; BaseUser baseUser = userMapper.selectByPrimaryKey(firmCheckDataBO.getAppid()); RSAPrivateKey privateKey = null; try { privateKey = RSAUtil.getPrivateKey(baseUser.getRsaPrivateKey()); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } catch (InvalidKeySpecException e) { e.printStackTrace(); } //解密得到desKey String desKey = RSAUtil.privateDecrypt(firmCheckDataBO.getPublicKey(), privateKey); System.out.println("deskey:"+desKey); byte[] bytes = new byte[0]; try { bytes = RSAUtil.decryptBASE64(firmCheckDataBO.getEncryptData()); } catch (Exception e) { e.printStackTrace(); } //des解密 byte[] dataByte = DESUtil.desDecrypt(bytes, desKey); cipherText = new String(dataByte); return cipherText; } /** * 根据type值的不同,发往不同的队列 * @param checkData 基本业务字段 * @param decodeData 解密后data json */ String sendQueueByType(FirmCheckDataBO checkData,String decodeData,RabbitMqSender rabbitMqSender){ String returnClientJson = null; String jsonString = null; if(checkData != null && checkData.getDataType() != null){ switch (checkData.getDataType()){ case "Am0": // DcsDetectRealTimeBO dcsDetectRealTimeBO = JSON.parseObject(decodeData, DcsDetectRealTimeBO.class); List<DcsDetectRealTimeBO> dcsDetectRealTimeBOS = JSON.parseArray(decodeData, DcsDetectRealTimeBO.class); if(dcsDetectRealTimeBOS!= null&&dcsDetectRealTimeBOS.size()>0){ for (DcsDetectRealTimeBO dcsDetectRealTimeBO: dcsDetectRealTimeBOS) { DcsDetectRealTimeBO dcsDetectRealTime = verifyDcsDetectRealTimeData(dcsDetectRealTimeBO); jsonString = JSONArray.toJSONString(dcsDetectRealTime); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendDetectRealTime(returnClientJson); } } break; case "Aa0": // DcsDetectAlarmDataBO dcsDetectAlarmDataBO = JSON.parseObject(decodeData, DcsDetectAlarmDataBO.class); List<DcsDetectAlarmDataBO> dcsDetectAlarmDataBOs = JSON.parseArray(decodeData, DcsDetectAlarmDataBO.class); if(dcsDetectAlarmDataBOs != null && dcsDetectAlarmDataBOs.size()>0){ for (DcsDetectAlarmDataBO dcsDetectAlarmDataBO: dcsDetectAlarmDataBOs) { DcsDetectAlarmDataBO dcsDetectAlarmData = verifyDcsDetectAlarmData(dcsDetectAlarmDataBO); jsonString = JSONArray.toJSONString(dcsDetectAlarmData); checkData.setEncryptData(jsonString); returnClientJson = JSON.toJSONString(checkData); rabbitMqSender.sendDetectAlarm(returnClientJson); } } break; case "Xa0": // FirePointAlarmPointBO firePointAlarmPoint = JSON.parseObject(decodeData, FirePointAlarmPointBO.class); List<FirePointAlarmPointBO> firePointAlarmPoints = JSON.parseArray(decodeData, FirePointAlarmPointBO.class); if(firePointAlarmPoints != null && firePointAlarmPoints.size()>0){ for (FirePointAlarmPointBO firePointAlarmPoint: firePointAlarmPoints) { FirePointAlarmPointBO firePointAlarmPointBO = verifyFirePoint(firePointAlarmPoint); jsonString = JSONArray.toJSONString(firePointAlarmPointBO); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendFirePoint(returnClientJson); } } break; case "Nm0": // EnergyExpendDataBO energyExpendData = JSON.parseObject(decodeData, EnergyExpendDataBO.class); List<EnergyExpendDataBO> energyExpendDatas = JSON.parseArray(decodeData, EnergyExpendDataBO.class); if(energyExpendDatas != null && energyExpendDatas.size()>0){ if(energyExpendDatas!=null &&energyExpendDatas.size()>0){ for (EnergyExpendDataBO energyExpendData: energyExpendDatas) { EnergyExpendDataBO energyExpendDataBO = verifyEnergyExpendData(energyExpendData); jsonString = JSONArray.toJSONString(energyExpendDataBO); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendEnergyExpend(returnClientJson); } } } break; case "Fm2": // EnterpriseOnDutyBO enterpriseOnDuty = JSON.parseObject(decodeData, EnterpriseOnDutyBO.class); List<EnterpriseCarComeRealTimeBO> enterpriseCarComeRealTimeBOS = JSON.parseArray(decodeData, EnterpriseCarComeRealTimeBO.class); if(enterpriseCarComeRealTimeBOS != null && enterpriseCarComeRealTimeBOS.size()>0){ for (EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO: enterpriseCarComeRealTimeBOS) { EnterpriseCarComeRealTimeBO enterpriseCarComeRealTimeBO1 = verifyEnterpriseCarRealTimeData(enterpriseCarComeRealTimeBO); jsonString = JSONArray.toJSONString(enterpriseCarComeRealTimeBO1); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendEnterpriseOnDuty(returnClientJson); } } break; case "Fm1": // TwoDoorsInOutBO twoDoorsInOut = JSON.parseObject(decodeData, TwoDoorsInOutBO.class); List<TwoDoorsInOutBO> twoDoorsInOuts = JSON.parseArray(decodeData, TwoDoorsInOutBO.class); if(twoDoorsInOuts != null && twoDoorsInOuts.size()>0){ for (TwoDoorsInOutBO twoDoorsInOut: twoDoorsInOuts) { TwoDoorsInOutBO twoDoorsInOutBO = verifyTwoDoorsInOut(twoDoorsInOut); jsonString = JSONArray.toJSONString(twoDoorsInOutBO); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendTwoDoorsInout(returnClientJson); } } break; case "Fm0": // TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal = JSON.parseObject(decodeData, TwoDoorsWorkShopTotalBO.class); List<TwoDoorsWorkShopTotalBO> twoDoorsWorkShopTotals = JSON.parseArray(decodeData, TwoDoorsWorkShopTotalBO.class); if(twoDoorsWorkShopTotals!= null && twoDoorsWorkShopTotals.size()>0){ for (TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotal: twoDoorsWorkShopTotals) { TwoDoorsWorkShopTotalBO twoDoorsWorkShopTotalBO = verifyTwoDoorsWorkshop(twoDoorsWorkShopTotal); jsonString = JSONArray.toJSONString(twoDoorsWorkShopTotalBO); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendTwoDoorsWorkshop(returnClientJson); } } break; case "Fa3": // TwoDoorsAlarmBO twoDoorsAlarm = JSON.parseObject(decodeData, TwoDoorsAlarmBO.class); List<PersonPositionAlarmBO> personPositionAlarmBOS = JSON.parseArray(decodeData, PersonPositionAlarmBO.class); if(personPositionAlarmBOS!= null && personPositionAlarmBOS.size()>0){ for (PersonPositionAlarmBO personPositionAlarmBO: personPositionAlarmBOS) { PersonPositionAlarmBO personPositionAlarmBO1 = verifyPersonPosition(personPositionAlarmBO); jsonString = JSONArray.toJSONString(personPositionAlarmBO1); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendTwoDoorsAlarm(returnClientJson); } } break; case "Fa4": // TwoDoorsAlarmBO twoDoorsAlarm = JSON.parseObject(decodeData, TwoDoorsAlarmBO.class); List<ElectricRailBO> electricRailBOS = JSON.parseArray(decodeData, ElectricRailBO.class); if(electricRailBOS!= null && electricRailBOS.size()>0){ for (ElectricRailBO electricRailBO: electricRailBOS) { ElectricRailBO electricRail = verifyElectricRail(electricRailBO); jsonString = JSONArray.toJSONString(electricRail); checkData.setEncryptData(jsonString); returnClientJson = JSONArray.toJSONString(checkData); rabbitMqSender.sendElectronicRail(returnClientJson); } } break; default: throw new RuntimeException("dataType传值有误"); } } return returnClientJson; } }
DESUtil
package com.zhetang.netty; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; import javax.crypto.SecretKeyFactory; import javax.crypto.spec.DESKeySpec; import javax.crypto.spec.IvParameterSpec; import java.security.NoSuchAlgorithmException; /** * Created with IntelliJ IDEA. * User: lzx * Date: 2020/6/23 * Time: 9:29 * Description: No Description */ public class DESUtil { /** * 生成KEY */ public static byte[] getKey(){ KeyGenerator keyGenerator = null; try { keyGenerator = KeyGenerator.getInstance("DES"); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); System.out.println("----------------KEY生成失败!"); return null; } keyGenerator.init(56); SecretKey secretKey = keyGenerator.generateKey(); byte[] byteKey = secretKey.getEncoded(); return byteKey; } /** * DES加密 * * @param context * @return */ public static byte[] desEncrypt(String context, String key) { try { // KEY转换 DESKeySpec desKeySpec = new DESKeySpec(key.getBytes("UTF-8")); SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("DES"); SecretKey conventSecretKey = secretKeyFactory.generateSecret(desKeySpec); // 加密 Cipher cipher = Cipher.getInstance("DES/CBC/PKCS5Padding"); cipher.init(Cipher.ENCRYPT_MODE, conventSecretKey, new IvParameterSpec(key.getBytes("UTF-8"))); return cipher.doFinal(context.getBytes("UTF-8")); } catch (Throwable e) { e.printStackTrace(); return null; } } /** * DES解密 * * @param context * @return */ public static byte[] desDecrypt(byte[] context, String key) { try { // KEY转换 DESKeySpec desKeySpec = new DESKeySpec(key.getBytes("UTF-8")); SecretKeyFactory secretKeyFactory = SecretKeyFactory.getInstance("DES"); SecretKey conventSecretKey = secretKeyFactory.generateSecret(desKeySpec); // 解密 Cipher cipher = Cipher.getInstance("DES/CBC/PKCS5Padding"); cipher.init(Cipher.DECRYPT_MODE, conventSecretKey, new IvParameterSpec(key.getBytes("UTF-8"))); return cipher.doFinal(context); } catch (Throwable e) { e.printStackTrace(); return null; } } }
RSAUtil
package com.zhetang.netty; import org.apache.commons.codec.binary.Base64; import org.apache.tomcat.util.http.fileupload.IOUtils; import javax.crypto.Cipher; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.FileWriter; import java.security.*; import java.security.interfaces.RSAPrivateKey; import java.security.interfaces.RSAPublicKey; import java.security.spec.InvalidKeySpecException; import java.security.spec.PKCS8EncodedKeySpec; import java.security.spec.X509EncodedKeySpec; import org.apache.commons.codec.binary.Base64; /** * Created with IntelliJ IDEA. * User: lzx * Date: 2020/6/23 * Time: 9:30 * Description: No Description */ public class RSAUtil { /** * 转密钥字符串(base64编码) * * @return */ public static String getKeyString(Key key) throws Exception { byte[] keyBytes = key.getEncoded(); // String s = new String(Base64.encodeBase64(keyBytes)); String s = new String(Base64.encodeBase64(keyBytes)); return s; } /** * 得到公钥 * @param publicKey 密钥字符串(经过base64编码) * @throws Exception */ public static RSAPublicKey getPublicKey(String publicKey) throws NoSuchAlgorithmException, InvalidKeySpecException { //通过X509编码的Key指令获得公钥对象 KeyFactory keyFactory = KeyFactory.getInstance("RSA"); X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(Base64.decodeBase64(publicKey)); RSAPublicKey key = (RSAPublicKey) keyFactory.generatePublic(x509KeySpec); return key; } /** * 得到私钥 * @param privateKey 密钥字符串(经过base64编码) * @throws Exception */ public static RSAPrivateKey getPrivateKey(String privateKey) throws NoSuchAlgorithmException, InvalidKeySpecException { //通过PKCS#8编码的Key指令获得私钥对象 KeyFactory keyFactory = KeyFactory.getInstance("RSA"); PKCS8EncodedKeySpec pkcs8KeySpec = new PKCS8EncodedKeySpec(Base64.decodeBase64(privateKey)); RSAPrivateKey key = (RSAPrivateKey) keyFactory.generatePrivate(pkcs8KeySpec); return key; } /** * 生成密钥对 * @param pubfilePath 公钥存放的文件路径 * @param prifilePath 私钥存放的文件路径 */ public static void genKeyPair(String pubfilePath, String prifilePath) { // KeyPairGenerator类用于生成公钥和私钥对,基于RSA算法生成对象 KeyPairGenerator keyPairGen = null; try { keyPairGen = KeyPairGenerator.getInstance("RSA"); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } // 初始化密钥对生成器,密钥大小为96-1024位 keyPairGen.initialize(1024, new SecureRandom()); // 生成一个密钥对,保存在keyPair中 KeyPair keyPair = keyPairGen.generateKeyPair(); // 得到私钥 RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate(); // 得到公钥 RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic(); try { // 得到公钥字符串 String publicKeyString = getKeyString(publicKey); // 得到私钥字符串 String privateKeyString = getKeyString(privateKey); // 将密钥对写入到文件 FileWriter pubfw = new FileWriter(pubfilePath); FileWriter prifw = new FileWriter(prifilePath); BufferedWriter pubbw = new BufferedWriter(pubfw); BufferedWriter pribw = new BufferedWriter(prifw); pubbw.write(publicKeyString); pribw.write(privateKeyString); pubbw.flush(); pubbw.close(); pubfw.close(); pribw.flush(); pribw.close(); prifw.close(); } catch (Exception e) { e.printStackTrace(); } } /** * 公钥加密 * @param data 需要加密的内容 * @param publicKey 公钥 * @return */ public static String publicEncrypt(String data, RSAPublicKey publicKey){ try{ Cipher cipher = Cipher.getInstance("RSA"); cipher.init(Cipher.ENCRYPT_MODE, publicKey); return new String(Base64.encodeBase64(rsaSplitCodec(cipher, Cipher.ENCRYPT_MODE, data.getBytes("UTF-8"), publicKey.getModulus().bitLength()))); }catch(Exception e){ throw new RuntimeException("加密字符串[" + data + "]时遇到异常", e); } } /** * 私钥解密 * @param data 需要加密的内容 * @param privateKey 私钥 * @return */ public static String privateDecrypt(String data, RSAPrivateKey privateKey){ try{ Cipher cipher = Cipher.getInstance("RSA"); cipher.init(Cipher.DECRYPT_MODE, privateKey); return new String(rsaSplitCodec(cipher, Cipher.DECRYPT_MODE, Base64.decodeBase64(data), privateKey.getModulus().bitLength()), "UTF-8"); }catch(Exception e){ throw new RuntimeException("解密字符串[" + data + "]时遇到异常", e); } } private static byte[] rsaSplitCodec(Cipher cipher, int opmode, byte[] datas, int keySize){ int maxBlock = 0; if(opmode == Cipher.DECRYPT_MODE){ maxBlock = keySize / 8; }else{ maxBlock = keySize / 8 - 11; } ByteArrayOutputStream out = new ByteArrayOutputStream(); int offSet = 0; byte[] buff; int i = 0; try{ while(datas.length > offSet){ if(datas.length-offSet > maxBlock){ buff = cipher.doFinal(datas, offSet, maxBlock); }else{ buff = cipher.doFinal(datas, offSet, datas.length-offSet); } out.write(buff, 0, buff.length); i++; offSet = i * maxBlock; } }catch(Exception e){ throw new RuntimeException("加解密阀值为["+maxBlock+"]的数据时发生异常", e); } byte[] resultDatas = out.toByteArray(); IOUtils.closeQuietly(out); return resultDatas; } public static String encryptBASE64(byte[] key) throws Exception { // return (new BASE64Encoder()).encodeBuffer(key); return (Base64.encodeBase64String(key)); } public static byte[] decryptBASE64(String key) throws Exception { // return (new BASE64Decoder()).decodeBuffer(key); return (Base64.decodeBase64(key)); } }
------------------
扩展 AESUtil
package com.zhetang.netty; import org.apache.commons.codec.binary.Base64; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; import javax.crypto.spec.IvParameterSpec; import javax.crypto.spec.SecretKeySpec; import java.security.SecureRandom; public class AESUtil { // AES secretKey length (must be 16 bytes) public static final String secretKey = "QAZWSXEDCRFVTGBH"; // AES密码器 private static Cipher cipher; // 字符串编码 private static final String KEY_CHARSET = "UTF-8"; // 算法方式 private static final String KEY_ALGORITHM = "AES"; // 算法/模式/填充 private static final String CIPHER_ALGORITHM_CBC = "AES/CBC/PKCS5Padding"; // 私钥大小128/192/256(bits)位 即:16/24/32bytes,暂时使用128,如果扩大需要更换java/jre里面的jar包 private static final Integer PRIVATE_KEY_SIZE_BIT = 128; private static final Integer PRIVATE_KEY_SIZE_BYTE = 16; /** * @param plainText 明文:要加密的内容 * @return 密文:加密后的内容,如有异常返回空串:"" * @Description: 加密 * @Author:杨攀 * @Since: 2019年9月17日上午10:17:18 */ public static String encrypt(String plainText) { return encrypt(secretKey, plainText); } /** * @param secretKey 密钥:加密的规则 16位 * @param plainText 明文:要加密的内容 * @return cipherText 密文:加密后的内容,如有异常返回空串:"" * @Description: 加密 * @Author:杨攀 * @Since: 2019年9月12日下午7:09:31 */ public static String encrypt(String secretKey, String plainText) { if (secretKey.length() != PRIVATE_KEY_SIZE_BYTE) { throw new RuntimeException("AESUtil:Invalid AES secretKey length (must be 16 bytes)"); } // 密文字符串 String cipherText = ""; try { // 加密模式初始化参数 initParam(secretKey, Cipher.ENCRYPT_MODE); // 获取加密内容的字节数组 byte[] bytePlainText = plainText.getBytes(KEY_CHARSET); // 执行加密 byte[] byteCipherText = cipher.doFinal(bytePlainText); cipherText = Base64.encodeBase64String(byteCipherText); } catch (Exception e) { throw new RuntimeException("AESUtil:encrypt fail!", e); } return cipherText; } /** * @param cipherText 密文:加密后的内容,即需要解密的内容 * @return 明文:解密后的内容即加密前的内容,如有异常返回空串:"" * @Description: 解密 * @Author:杨攀 * @Since: 2019年9月17日上午10:18:19 */ public static String decrypt(String cipherText) { return decrypt(secretKey, cipherText); } /** * @param secretKey 密钥:加密的规则 16位 * @param cipherText 密文:加密后的内容,即需要解密的内容 * @return * @Description: 解密 * @Author:杨攀 * @Since: 2019年9月12日下午7:10:06 */ public static String decrypt(String secretKey, String cipherText) { if (secretKey.length() != PRIVATE_KEY_SIZE_BYTE) { throw new RuntimeException("AESUtil:Invalid AES secretKey length (must be 16 bytes)"); } // 明文字符串 String plainText = ""; try { initParam(secretKey, Cipher.DECRYPT_MODE); // 将加密并编码后的内容解码成字节数组 byte[] byteCipherText = Base64.decodeBase64(cipherText); // 解密 byte[] bytePlainText = cipher.doFinal(byteCipherText); plainText = new String(bytePlainText, KEY_CHARSET); } catch (Exception e) { throw new RuntimeException("AESUtil:decrypt fail!", e); } return plainText; } /** * 初始化参数 * * @param secretKey 密钥:加密的规则 16位 * @param mode 加密模式:加密or解密 */ public static void initParam(String secretKey, int mode) { try { // 防止Linux下生成随机key SecureRandom secureRandom = SecureRandom.getInstance("SHA1PRNG"); secureRandom.setSeed(secretKey.getBytes()); // 获取key生成器 KeyGenerator keygen = KeyGenerator.getInstance(KEY_ALGORITHM); keygen.init(PRIVATE_KEY_SIZE_BIT, secureRandom); // 获得原始对称密钥的字节数组 byte[] raw = secretKey.getBytes(); // 根据字节数组生成AES内部密钥 SecretKeySpec key = new SecretKeySpec(raw, KEY_ALGORITHM); // 根据指定算法"AES/CBC/PKCS5Padding"实例化密码器 cipher = Cipher.getInstance(CIPHER_ALGORITHM_CBC); IvParameterSpec iv = new IvParameterSpec(secretKey.getBytes()); cipher.init(mode, key, iv); } catch (Exception e) { throw new RuntimeException("AESUtil:initParam fail!", e); } } public static void main(String[] args) { long s = System.currentTimeMillis(); String text = "xxxx"; String encryptMsg = encrypt(secretKey, text); System.out.println("密文为:" + encryptMsg); long e = System.currentTimeMillis(); System.out.println(e - s); String decryptMsg = decrypt(secretKey, encryptMsg); System.out.println("明文为:" + decryptMsg); long d = System.currentTimeMillis(); System.out.println(d - e); } }
Netty(六)UDP在netty中的使用
关于UDP的介绍,这里不在阐述。
相比于TCP而言,UDP不存在客户端和服务端的实际链接,因此不需要为连接(ChannelPipeline)设置handler。
服务端:
1 public void run(int port)throws Exception{ 2 EventLoopGroup group = new NioEventLoopGroup(); 3 try { 4 Bootstrap b = new Bootstrap(); 5 b.group(group).channel(NioDatagramChannel.class) 6 .option(ChannelOption.SO_BROADCAST,true) 7 .handler(new UdpServerHandler()); 8 9 b.bind(port).sync().channel().closeFuture().await(); 10 } 11 finally { 12 group.shutdownGracefully(); 13 } 14 }
1 @Override 2 public void messageReceived(ChannelHandlerContext channelHandlerContext, 3 DatagramPacket datagramPacket) throws Exception { 4 // 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。 5 String req = datagramPacket.content().toString(CharsetUtil.UTF_8); 6 System.out.println(req); 7 8 if("啪啪啪来拉!!!".equals(req)){ 9 channelHandlerContext.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer( 10 "结果:",CharsetUtil.UTF_8),datagramPacket.sender())); 11 } 12 }
客户端:
public void run(int port)throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST,true) .handler(new UdpClientHandler()); Channel ch = b.bind(0).sync().channel(); // 向网段类所有机器广播发UDP ch.writeAndFlush( new DatagramPacket( Unpooled.copiedBuffer("啪啪啪来拉!!!", CharsetUtil.UTF_8), new InetSocketAddress( "255.255.255.255",port ))).sync(); if(!ch.closeFuture().await(15000)){ System.out.println("查询超时!!!"); } } finally { group.shutdownGracefully(); } }
public void messageReceived(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception { String response = datagramPacket.content().toString(CharsetUtil.UTF_8); if(response.startsWith("结果:")){ System.out.println(response); channelHandlerContext.close(); } }
源码下载
源码在src/main/java/Unp下,分为客户端和服务端,他们的代码基本和Netty入门章节的代码类似,只是减少了相关的解码器使用。