上一篇文章中,介绍了netty实现UDP服务器的栗子。
本文将会对UDP服务器与spring boot整合起来,并使用RedisTemplate的操作类访问Redis和使用Spring DATA JPA链接MySQL数据库,其中会使用多线程、异步等知识。
只公布了一个框架,需要的同学可以根据此来进行扩展,增加自己需要的功能模块。如Controller部分。
本人使用的编辑器是IntelliJ IDEA 2017.1.exe版本(链接:http://pan.baidu.com/s/1pLODHm7 密码:dlx7);建议使用STS或者是idea编辑器来进行spring的学习。
1)项目目录结构
整个项目的目录结构如下:
2)jar包
其中pom.xml文件的内容如下:
只有netty-all和commons-lang3是手动加入的jar包,其余的都是创建spring boot项目时候选择组件后自动导入的。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.example</groupId> 7 <artifactId>udplearning</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 <packaging>jar</packaging> 10 11 <name>udplearning</name> 12 <description>Demo project for Spring Boot</description> 13 14 <parent> 15 <groupId>org.springframework.boot</groupId> 16 <artifactId>spring-boot-starter-parent</artifactId> 17 <version>1.5.6.RELEASE</version> 18 <relativePath/> <!-- lookup parent from repository --> 19 </parent> 20 21 <properties> 22 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 23 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 24 <commons-lang3.version>3.4</commons-lang3.version> 25 <java.version>1.8</java.version> 26 </properties> 27 28 <dependencies> 29 30 <!-- netty --> 31 32 <dependency> 33 <groupId>io.netty</groupId> 34 <artifactId>netty-all</artifactId> 35 <version>4.0.49.Final</version> 36 </dependency> 37 38 39 <dependency> 40 <groupId>org.apache.commons</groupId> 41 <artifactId>commons-lang3</artifactId> 42 <version>${commons-lang3.version}</version> 43 </dependency> 44 45 46 47 <dependency> 48 <groupId>org.springframework.boot</groupId> 49 <artifactId>spring-boot-starter-data-jpa</artifactId> 50 </dependency> 51 <dependency> 52 <groupId>org.springframework.boot</groupId> 53 <artifactId>spring-boot-starter-data-redis</artifactId> 54 </dependency> 55 <dependency> 56 <groupId>org.springframework.boot</groupId> 57 <artifactId>spring-boot-starter-jdbc</artifactId> 58 </dependency> 59 <dependency> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-starter-web</artifactId> 62 </dependency> 63 <dependency> 64 <groupId>org.springframework.boot</groupId> 65 <artifactId>spring-boot-starter-web-services</artifactId> 66 </dependency> 67 68 <dependency> 69 <groupId>mysql</groupId> 70 <artifactId>mysql-connector-java</artifactId> 71 <scope>runtime</scope> 72 </dependency> 73 <dependency> 74 <groupId>org.springframework.boot</groupId> 75 <artifactId>spring-boot-starter-test</artifactId> 76 <scope>test</scope> 77 </dependency> 78 </dependencies> 79 80 <build> 81 <plugins> 82 <plugin> 83 <groupId>org.springframework.boot</groupId> 84 <artifactId>spring-boot-maven-plugin</artifactId> 85 </plugin> 86 </plugins> 87 </build> 88 89 90 </project>
3)配置文件application.properties
application.properties的内容:
1 spring.profiles.active=test 2 3 spring.messages.encoding=utf-8 4 5 logging.config=classpath:logback.xml
“spring.profiles.active” 针对多种启动环境的spring boot配置方法,此时启动的是test运行环境,即默认是启动application-test.properties里面的配置信息;
“spring.messages.encoding=utf-8”是指编码方式utf-8;
“logging.config=classpath:logback.xml”是指日志文件位置。
application-test.properties的内容如下:
1 context.listener.classes=com.example.demo.init.StartupEvent 2 3 #mysql 4 spring.jpa.show-sql=true 5 spring.jpa.database=mysql 6 #spring.jpa.hibernate.ddl-auto=update 7 spring.datasource.url=jdbc:mysql://127.0.0.1/test 8 spring.datasource.username=root 9 spring.datasource.password=123456 10 spring.datasource.driver-class-name=com.mysql.jdbc.Driver 11 spring.datasource.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0) 12 13 spring.session.store-type=none 14 15 # (RedisProperties) 16 spring.redis.database=3 17 spring.redis.host=127.0.0.1 18 spring.redis.port=6379 19 spring.redis.password=123456 20 spring.redis.pool.max-active=8 21 spring.redis.pool.max-wait=-1 22 spring.redis.pool.max-idle=8 23 spring.redis.pool.min-idle=0 24 spring.redis.timeout=0 25 26 27 #UDP消息接收打端口 28 sysfig.udpReceivePort = 7686 29 30 #线程池 31 spring.task.pool.corePoolSize = 5 32 spring.task.pool.maxPoolSize = 100 33 spring.task.pool.keepAliveSeconds = 100 34 spring.task.pool.queueCapacity = 100
其中配置了context.listener.classes=com.example.demo.init.StartupEvent,将StartupEvent类作为Spring boot启动后执行文件。
其中还配置了一些mysql、redis和自定义的属性。可根据项目的实际情况修改。
4)日志文件logback.xml
logback.xml的内容如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <configuration xmlns="http://ch.qos.logback/xml/ns/logback" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback 5 http://ch.qos.logback/xml/ns/logback/logback.xsd 6 http://ch.qos.logback/xml/ns/logback "> 7 <property name="APP_Name" value="udplearning" /> 8 <timestamp key="bySecond" datePattern="yyyyMMdd'T'HHmmss" /> 9 <contextName>${APP_Name}</contextName> 10 11 <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> 12 <encoder> 13 <pattern>%d{yyyyMMddHHmmss}|%-5level| %logger{0}.%M | %msg | %thread %n</pattern> 14 </encoder> 15 </appender> 16 17 <appender name="FILELOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> 18 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> 19 <fileNamePattern>${catalina.home}/logs/app.%d{yyyyMMdd}.log</fileNamePattern> 20 <maxHistory>30</maxHistory> 21 </rollingPolicy> 22 <encoder> 23 <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern> 24 </encoder> 25 </appender> 26 27 <appender name="RUNLOG" class="ch.qos.logback.core.rolling.RollingFileAppender"> 28 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> 29 <fileNamePattern>${catalina.home}/logs/run.%d{yyyyMMdd}.log</fileNamePattern> 30 <maxHistory>7</maxHistory> 31 </rollingPolicy> 32 <encoder> 33 <pattern>%d{yyMMddHHmmss.SSS}|%-5level| %msg%n</pattern> 34 </encoder> 35 </appender> 36 37 <logger name="com.example.demo" level="debug" additivity="false"> 38 <appender-ref ref="STDOUT" /> 39 <appender-ref ref="FILELOG" /> 40 </logger> 41 42 <root level="info"> 43 <appender-ref ref="STDOUT" /> 44 </root> 45 </configuration>
日志的级别是info级别 可以根据自己项目的实际情况进行设置。
5)StartupEvent.java
1 package com.example.demo.init; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.context.ApplicationContext; 6 import org.springframework.context.ApplicationListener; 7 import org.springframework.context.event.ContextRefreshedEvent; 8 9 /** 10 * 11 * Created by wj on 2017/8/28. 12 */ 13 14 public class StartupEvent implements ApplicationListener<ContextRefreshedEvent> { 15 private static final Logger log = LoggerFactory.getLogger(StartupEvent.class); 16 17 private static ApplicationContext context; 18 19 @Override 20 public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { 21 22 try { 23 24 context = contextRefreshedEvent.getApplicationContext(); 25 26 SysConfig sysConfig = (SysConfig) context.getBean(SysConfig.class); 27 28 //接收UDP消息并保存至redis中 29 UdpServer udpServer = (UdpServer)StartupEvent.getBean(UdpServer.class); 30 udpServer.run(sysConfig.getUdpReceivePort()); 31 32 33 // 这里可以开启多个线程去执行不同的任务 34 // 此处为工作的内容,不便公开! 35 36 37 } catch (Exception e) { 38 log.error("Exception", e); 39 } 40 } 41 42 public static Object getBean(Class beanName) { 43 return context != null ? context.getBean(beanName) : null; 44 } 45 }
6)UdpServer.java
1 package com.example.demo.init; 2 3 import com.example.demo.handle.UdpServerHandler; 4 import io.netty.bootstrap.Bootstrap; 5 import io.netty.channel.ChannelOption; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.nio.NioDatagramChannel; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 import org.springframework.scheduling.annotation.Async; 12 import org.springframework.stereotype.Component; 13 14 /** 15 * server服务器 16 * Created by wj on 2017/8/30. 17 */ 18 @Component 19 public class UdpServer { 20 21 private static final Logger log= LoggerFactory.getLogger(UdpServer.class); 22 23 // private static final int PORT = Integer.parseInt(System.getProperty("port", "7686")); 24 25 @Async("myTaskAsyncPool") 26 public void run(int udpReceivePort) { 27 28 EventLoopGroup group = new NioEventLoopGroup(); 29 log.info("Server start! Udp Receive msg Port:" + udpReceivePort ); 30 31 try { 32 Bootstrap b = new Bootstrap(); 33 b.group(group) 34 .channel(NioDatagramChannel.class) 35 .option(ChannelOption.SO_BROADCAST, true) 36 .handler(new UdpServerHandler()); 37 38 b.bind(udpReceivePort).sync().channel().closeFuture().await(); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } finally { 42 group.shutdownGracefully(); 43 } 44 } 45 46 }
此处NioDatagramChannel.class采用的是非阻塞的模式接受UDP消息,若是接受的UDP消息少,可以采用阻塞式的方式接受UDP消息。
UdpServer.run()方法使用@Async将该方法定义成异步的,myTaskAsyncPool是自定义的线程池。
7)UdpServerHandler.java
1 package com.example.demo.handle; 2 3 import com.example.demo.init.StartupEvent; 4 import com.example.demo.mod.UdpRecord; 5 import com.example.demo.repository.mysql.UdpRepository; 6 import com.example.demo.repository.redis.RedisRepository; 7 import io.netty.buffer.Unpooled; 8 import io.netty.channel.ChannelHandlerContext; 9 import io.netty.channel.SimpleChannelInboundHandler; 10 import io.netty.channel.socket.DatagramPacket; 11 import io.netty.util.CharsetUtil; 12 import org.apache.commons.lang3.StringUtils; 13 import org.slf4j.Logger; 14 import org.slf4j.LoggerFactory; 15 16 import java.sql.Timestamp; 17 import java.util.Date; 18 19 /** 20 * 接受UDP消息,并保存至redis的list链表中 21 * Created by wj on 2017/8/30. 22 * 23 */ 24 25 public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> { 26 27 private static final Logger log= LoggerFactory.getLogger(UdpServerHandler.class); 28 29 //用来计算server接收到多少UDP消息 30 private static int count = 0; 31 32 @Override 33 public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception { 34 35 String receiveMsg = packet.content().toString(CharsetUtil.UTF_8); 36 37 log.info("Received UDP Msg:" + receiveMsg); 38 39 UdpRecord udpRecord = new UdpRecord(); 40 41 //判断接受到的UDP消息是否正确(未实现) 42 if (StringUtils.isNotEmpty(receiveMsg) ){ 43 44 //计算接收到的UDP消息的数量 45 count++; 46 47 //获取UdpRepository对象,将接收UDP消息的日志保存至mysql中 48 udpRecord.setUdpMsg(receiveMsg); 49 udpRecord.setTime(getTime()); 50 UdpRepository udpRepository = (UdpRepository) StartupEvent.getBean(UdpRepository.class); 51 udpRepository.save(udpRecord); 52 53 //获取RedirRepository对象 54 RedisRepository redisRepository = (RedisRepository) StartupEvent.getBean(RedisRepository.class); 55 //将获取到的UDP消息保存至redis的list列表中 56 redisRepository.lpush("udp:msg", receiveMsg); 57 redisRepository.setKey("UDPMsgNumber", String.valueOf(count)); 58 59 60 // 在这里可以返回一个UDP消息给对方,告知已接收到UDP消息,但考虑到这是UDP消息,此处可以注释掉 61 ctx.write(new DatagramPacket( 62 Unpooled.copiedBuffer("QOTM: " + "Got UDP Message!" , CharsetUtil.UTF_8), packet.sender())); 63 64 }else{ 65 log.error("Received Error UDP Messsage:" + receiveMsg); 66 } 67 } 68 69 @Override 70 public void channelReadComplete(ChannelHandlerContext ctx) { 71 ctx.flush(); 72 } 73 74 @Override 75 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 76 cause.printStackTrace(); 77 // We don't close the channel because we can keep serving requests. 78 } 79 80 public Timestamp getTime(){ 81 Date date = new Date(); 82 Timestamp time = new Timestamp(date.getTime()); 83 return time; 84 } 85 86 }
此处若不借用ApplicationContext.getBean,是无法获取到RedisRepository对象的。
注:这里是无法使用注解@Autowired来获取到redisTemplate对象的。
8)repository
RedisRepository.java
1 package com.example.demo.repository.redis; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.data.redis.core.RedisTemplate; 7 import org.springframework.stereotype.Service; 8 9 /** 10 * 链接redis 11 * 实现list lpush和rpop 12 * Created by wj on 2017/8/30. 13 */ 14 15 16 @Service 17 public class RedisRepository { 18 private static final Logger log = LoggerFactory.getLogger(RedisRepository.class); 19 20 @Autowired 21 private RedisTemplate<String, String> redisTemplate; 22 23 //----------------String----------------------- 24 public void setKey(String key,String value){ 25 redisTemplate.opsForValue().set(key, value); 26 } 27 28 29 //----------------list---------------------- 30 public Long lpush(String key, String val) throws Exception{ 31 log.info("UDP Msg保存至redis中,key:" + key + ",val:" + val); 32 return redisTemplate.opsForList().leftPush(key, val); 33 } 34 35 public String rpop(String key) throws Exception { 36 return redisTemplate.opsForList().rightPop(key); 37 } 38 39 }
使用springframework框架中的RedisTemplate类去链接redis,此处是将收到的UDP消息左保存(lpush)至list链表中,然后右边弹出(rpop)。
UdpRepository.java
1 package com.example.demo.repository.mysql; 2 3 import com.example.demo.mod.UdpRecord; 4 import org.springframework.data.jpa.repository.JpaRepository; 5 6 /** 7 * Created by wj on 2017/8/31. 8 */ 9 public interface UdpRepository extends JpaRepository<UdpRecord,Long> { 10 11 }
定义Spring Data JPA接口,链接数据库。
其中
UdpRecord.java
1 package com.example.demo.mod; 2 3 import javax.persistence.Entity; 4 import javax.persistence.GeneratedValue; 5 import javax.persistence.Id; 6 import javax.persistence.Table; 7 import java.sql.Timestamp; 8 9 /** 10 * Created by wj on 2017/8/31. 11 * 12 * 用来记录接收的UDP消息的日志 13 */ 14 @Entity 15 @Table 16 public class UdpRecord { 17 18 private long id; 19 private String udpMsg; 20 private Timestamp time; 21 22 @Id 23 @GeneratedValue 24 public long getId() { 25 return id; 26 } 27 28 public void setId(long id) { 29 this.id = id; 30 } 31 32 public String getUdpMsg() { 33 return udpMsg; 34 } 35 36 public void setUdpMsg(String udpMsg) { 37 this.udpMsg = udpMsg; 38 } 39 40 public Timestamp getTime() { 41 return time; 42 } 43 44 public void setTime(Timestamp time) { 45 this.time = time; 46 } 47 }
注解@Entity和@Table辨明这是一个实体类表格 ,其中的@Id和@GeneratedValue表明id是key值并且是自动递增的。
9)线程池的相关信息
TaskExecutePool.java
1 package com.example.demo.thread; 2 3 import com.example.demo.init.SysConfig; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.context.annotation.Bean; 6 import org.springframework.context.annotation.Configuration; 7 import org.springframework.scheduling.annotation.EnableAsync; 8 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 9 10 import java.util.concurrent.Executor; 11 import java.util.concurrent.ThreadPoolExecutor; 12 13 /** 14 * Created by wangjian on 2017/8/29. 15 */ 16 @Configuration 17 @EnableAsync 18 public class TaskExecutePool { 19 20 @Autowired 21 private SysConfig config; 22 23 @Bean 24 public Executor myTaskAsyncPool() { 25 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 26 executor.setCorePoolSize(config.getCorePoolSize()); 27 executor.setMaxPoolSize(config.getMaxPoolSize()); 28 executor.setQueueCapacity(config.getQueueCapacity()); 29 executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); 30 executor.setThreadNamePrefix("MyExecutor-"); 31 32 // rejection-policy:当pool已经达到max size的时候,如何处理新任务 33 // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 34 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 35 executor.initialize(); 36 return executor; 37 } 38 }
10)配置文件SysConfig.java
1 package com.example.demo.init; 2 3 import org.springframework.boot.context.properties.ConfigurationProperties; 4 import org.springframework.stereotype.Component; 5 6 /** 7 * Created by wj on 2017/8/30. 8 */ 9 @Component 10 @ConfigurationProperties(prefix="sysfig") 11 public class SysConfig { 12 private int UdpReceivePort;//UDP消息接收端口 13 14 //线程池信息 15 private int CorePoolSize; 16 17 private int MaxPoolSize; 18 19 private int KeepAliveSeconds; 20 21 private int QueueCapacity; 22 23 public int getCorePoolSize() { 24 return CorePoolSize; 25 } 26 27 public void setCorePoolSize(int corePoolSize) { 28 CorePoolSize = corePoolSize; 29 } 30 31 public int getMaxPoolSize() { 32 return MaxPoolSize; 33 } 34 35 public void setMaxPoolSize(int maxPoolSize) { 36 MaxPoolSize = maxPoolSize; 37 } 38 39 public int getKeepAliveSeconds() { 40 return KeepAliveSeconds; 41 } 42 43 public void setKeepAliveSeconds(int keepAliveSeconds) { 44 KeepAliveSeconds = keepAliveSeconds; 45 } 46 47 public int getQueueCapacity() { 48 return QueueCapacity; 49 } 50 51 public void setQueueCapacity(int queueCapacity) { 52 QueueCapacity = queueCapacity; 53 } 54 55 public int getUdpReceivePort() { 56 return UdpReceivePort; 57 } 58 59 public void setUdpReceivePort(int udpReceivePort) { 60 UdpReceivePort = udpReceivePort; 61 } 62 }
11)小结
其实发送UDP和接收UDP消息的核心代码很简单,只是netty框架将其包装了。
UDP发送消息是
1 byte[] buffer = ... 2 InetAddress address = InetAddress.getByName("localhost"); 3 4 DatagramPacket packet = new DatagramPacket( 5 buffer, buffer.length, address, 9999); 6 DatagramSocket datagramSocket = new DatagramSocket(); 7 datagramSocket.send(packet);
udp接收消息是
1 DatagramSocket datagramSocket = new DatagramSocket(9999); 2 3 byte[] buffer =.... 4 DatagramPacket packet = new DatagramPacket(buffer, buffer.length); 5 6 datagramSocket.receive(packet);
看起来是不是很简单???
12)源代码下载地址
https://github.com/wj302763621/learning_udp.git
这里只公布了一个框架,其他很多部分由于涉及到了工作内容不便公布。
有需要的同学可以自行下载对其代码进行更改。