mysql udp服务器_netty学习:UDP服务器与Spring整合(2)
上一篇文章中,介绍了netty实现UDP服务器的栗子。
本文将会对UDP服务器与spring boot整合起来,并使用RedisTemplate的操作类访问Redis和使用Spring DATA JPA链接MySQL数据库,其中会使用多线程、异步等知识。
只公布了一个框架,需要的同学可以根据此来进行扩展,增加自己需要的功能模块。如Controller部分。
本人使用的编辑器是IntelliJ IDEA 2017.1.exe版本(链接:http://pan.baidu.com/s/1pLODHm7 密码:dlx7);建议使用STS或者是idea编辑器来进行spring的学习。
1)项目目录结构
整个项目的目录结构如下:
2)jar包
其中pom.xml文件的内容如下:
只有netty-all和commons-lang3是手动加入的jar包,其余的都是创建spring boot项目时候选择组件后自动导入的。
1 <?xml version="1.0" encoding="UTF-8"?>
2
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 4.0.0
5
6 com.example
7 udplearning
8 0.0.1-SNAPSHOT
9 jar
10
11 udplearning
12 Demo project for Spring Boot
13
14
15 org.springframework.boot
16 spring-boot-starter-parent
17 1.5.6.RELEASE
18
19
20
21
22 UTF-8
23 UTF-8
24 3.4
25 1.8
26
27
28
29
30
31
32
33 io.netty
34 netty-all
35 4.0.49.Final
36
37
38
39
40 org.apache.commons
41 commons-lang3
42 ${commons-lang3.version}
43
44
45
46
47
48 org.springframework.boot
49 spring-boot-starter-data-jpa
50
51
52 org.springframework.boot
53 spring-boot-starter-data-redis
54
55
56 org.springframework.boot
57 spring-boot-starter-jdbc
58
59
60 org.springframework.boot
61 spring-boot-starter-web
62
63
64 org.springframework.boot
65 spring-boot-starter-web-services
66
67
68
69 mysql
70 mysql-connector-java
71 runtime
72
73
74 org.springframework.boot
75 spring-boot-starter-test
76 test
77
78
79
80
81
82
83 org.springframework.boot
84 spring-boot-maven-plugin
85
86
87
88
89
90
3)配置文件application.properties
application.properties的内容:
1 spring.profiles.active=test2
3 spring.messages.encoding=utf-8
4
5 logging.config=classpath:logback.xml
“spring.profiles.active” 针对多种启动环境的spring boot配置方法,此时启动的是test运行环境,即默认是启动application-test.properties里面的配置信息;
“spring.messages.encoding=utf-8”是指编码方式utf-8;
“logging.config=classpath:logback.xml”是指日志文件位置。
application-test.properties的内容如下:
1 context.listener.classes=com.example.demo.init.StartupEvent2
3 #mysql4 spring.jpa.show-sql=true
5 spring.jpa.database=mysql6 #spring.jpa.hibernate.ddl-auto=update7 spring.datasource.url=jdbc:mysql://127.0.0.1/test
8 spring.datasource.username=root9 spring.datasource.password=123456
10 spring.datasource.driver-class-name=com.mysql.jdbc.Driver11 spring.datasource.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)12
13 spring.session.store-type=none14
15 # (RedisProperties)16 spring.redis.database=3
17 spring.redis.host=127.0.0.1
18 spring.redis.port=6379
19 spring.redis.password=123456
20 spring.redis.pool.max-active=8
21 spring.redis.pool.max-wait=-1
22 spring.redis.pool.max-idle=8
23 spring.redis.pool.min-idle=0
24 spring.redis.timeout=0
25
26
27 #UDP消息接收打端口28 sysfig.udpReceivePort = 7686
29
30 #线程池31 spring.task.pool.corePoolSize = 5
32 spring.task.pool.maxPoolSize = 100
33 spring.task.pool.keepAliveSeconds = 100
34 spring.task.pool.queueCapacity = 100
其中配置了context.listener.classes=com.example.demo.init.StartupEvent,将StartupEvent类作为Spring boot启动后执行文件。
其中还配置了一些mysql、redis和自定义的属性。可根据项目的实际情况修改。
4)日志文件logback.xml
logback.xml的内容如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://ch.qos.logback/xml/ns/logback
5 http://ch.qos.logback/xml/ns/logback/logback.xsd
6 http://ch.qos.logback/xml/ns/logback ">
7
8
9 ${APP_Name}
10
11
12
13 %d{yyyyMMddHHmmss}|%-5level| %logger{0}.%M | %msg | %thread %n
14
15
16
17
18
19 ${catalina.home}/logs/app.%d{yyyyMMdd}.log
20 30
21
22
23 %d{yyMMddHHmmss.SSS}|%-5level| %msg%n
24
25
26
27
28
29 ${catalina.home}/logs/run.%d{yyyyMMdd}.log
30 7
31
32
33 %d{yyMMddHHmmss.SSS}|%-5level| %msg%n
34
35
36
37
38
39
40
41
42
43
44
45
日志的级别是info级别 可以根据自己项目的实际情况进行设置。
5)StartupEvent.java
1 packagecom.example.demo.init;2
3 importorg.slf4j.Logger;4 importorg.slf4j.LoggerFactory;5 importorg.springframework.context.ApplicationContext;6 importorg.springframework.context.ApplicationListener;7 importorg.springframework.context.event.ContextRefreshedEvent;8
9 /**
10 *11 * Created by wj on 2017/8/28.12 */
13
14 public class StartupEvent implements ApplicationListener{15 private static final Logger log = LoggerFactory.getLogger(StartupEvent.class);16
17 private staticApplicationContext context;18
19 @Override20 public voidonApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {21
22 try{23
24 context =contextRefreshedEvent.getApplicationContext();25
26 SysConfig sysConfig = (SysConfig) context.getBean(SysConfig.class);27
28 //接收UDP消息并保存至redis中
29 UdpServer udpServer = (UdpServer)StartupEvent.getBean(UdpServer.class);30 udpServer.run(sysConfig.getUdpReceivePort());31
32
33 //这里可以开启多个线程去执行不同的任务34 //此处为工作的内容,不便公开!
35
36
37 } catch(Exception e) {38 log.error("Exception", e);39 }40 }41
42 public staticObject getBean(Class beanName) {43 return context != null ? context.getBean(beanName) : null;44 }45 }
6)UdpServer.java
1 packagecom.example.demo.init;2
3 importcom.example.demo.handle.UdpServerHandler;4 importio.netty.bootstrap.Bootstrap;5 importio.netty.channel.ChannelOption;6 importio.netty.channel.EventLoopGroup;7 importio.netty.channel.nio.NioEventLoopGroup;8 importio.netty.channel.socket.nio.NioDatagramChannel;9 importorg.slf4j.Logger;10 importorg.slf4j.LoggerFactory;11 importorg.springframework.scheduling.annotation.Async;12 importorg.springframework.stereotype.Component;13
14 /**
15 * server服务器16 * Created by wj on 2017/8/30.17 */
18 @Component19 public classUdpServer {20
21 private static final Logger log= LoggerFactory.getLogger(UdpServer.class);22
23 //private static final int PORT = Integer.parseInt(System.getProperty("port", "7686"));
24
25 @Async("myTaskAsyncPool")26 public void run(intudpReceivePort) {27
28 EventLoopGroup group = newNioEventLoopGroup();29 log.info("Server start! Udp Receive msg Port:" +udpReceivePort );30
31 try{32 Bootstrap b = newBootstrap();33 b.group(group)34 .channel(NioDatagramChannel.class)35 .option(ChannelOption.SO_BROADCAST, true)36 .handler(newUdpServerHandler());37
38 b.bind(udpReceivePort).sync().channel().closeFuture().await();39 } catch(InterruptedException e) {40 e.printStackTrace();41 } finally{42 group.shutdownGracefully();43 }44 }45
46 }
此处NioDatagramChannel.class采用的是非阻塞的模式接受UDP消息,若是接受的UDP消息少,可以采用阻塞式的方式接受UDP消息。
UdpServer.run()方法使用@Async将该方法定义成异步的,myTaskAsyncPool是自定义的线程池。
7)UdpServerHandler.java
1 packagecom.example.demo.handle;2
3 importcom.example.demo.init.StartupEvent;4 importcom.example.demo.mod.UdpRecord;5 importcom.example.demo.repository.mysql.UdpRepository;6 importcom.example.demo.repository.redis.RedisRepository;7 importio.netty.buffer.Unpooled;8 importio.netty.channel.ChannelHandlerContext;9 importio.netty.channel.SimpleChannelInboundHandler;10 importio.netty.channel.socket.DatagramPacket;11 importio.netty.util.CharsetUtil;12 importorg.apache.commons.lang3.StringUtils;13 importorg.slf4j.Logger;14 importorg.slf4j.LoggerFactory;15
16 importjava.sql.Timestamp;17 importjava.util.Date;18
19 /**
20 * 接受UDP消息,并保存至redis的list链表中21 * Created by wj on 2017/8/30.22 *23 */
24
25 public class UdpServerHandler extends SimpleChannelInboundHandler{26
27 private static final Logger log= LoggerFactory.getLogger(UdpServerHandler.class);28
29 //用来计算server接收到多少UDP消息
30 private static int count = 0;31
32 @Override33 public void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throwsException {34
35 String receiveMsg =packet.content().toString(CharsetUtil.UTF_8);36
37 log.info("Received UDP Msg:" +receiveMsg);38
39 UdpRecord udpRecord = newUdpRecord();40
41 //判断接受到的UDP消息是否正确(未实现)
42 if(StringUtils.isNotEmpty(receiveMsg) ){43
44 //计算接收到的UDP消息的数量
45 count++;46
47 //获取UdpRepository对象,将接收UDP消息的日志保存至mysql中
48 udpRecord.setUdpMsg(receiveMsg);49 udpRecord.setTime(getTime());50 UdpRepository udpRepository = (UdpRepository) StartupEvent.getBean(UdpRepository.class);51 udpRepository.save(udpRecord);52
53 //获取RedirRepository对象
54 RedisRepository redisRepository = (RedisRepository) StartupEvent.getBean(RedisRepository.class);55 //将获取到的UDP消息保存至redis的list列表中
56 redisRepository.lpush("udp:msg", receiveMsg);57 redisRepository.setKey("UDPMsgNumber", String.valueOf(count));58
59
60 //在这里可以返回一个UDP消息给对方,告知已接收到UDP消息,但考虑到这是UDP消息,此处可以注释掉
61 ctx.write(newDatagramPacket(62 Unpooled.copiedBuffer("QOTM: " + "Got UDP Message!", CharsetUtil.UTF_8), packet.sender()));63
64 }else{65 log.error("Received Error UDP Messsage:" +receiveMsg);66 }67 }68
69 @Override70 public voidchannelReadComplete(ChannelHandlerContext ctx) {71 ctx.flush();72 }73
74 @Override75 public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) {76 cause.printStackTrace();77 //We don't close the channel because we can keep serving requests.
78 }79
80 publicTimestamp getTime(){81 Date date = newDate();82 Timestamp time = newTimestamp(date.getTime());83 returntime;84 }85
86 }
此处若不借用ApplicationContext.getBean,是无法获取到RedisRepository对象的。
注:这里是无法使用注解@Autowired来获取到redisTemplate对象的。
8)repository
RedisRepository.java
1 packagecom.example.demo.repository.redis;2
3 importorg.slf4j.Logger;4 importorg.slf4j.LoggerFactory;5 importorg.springframework.beans.factory.annotation.Autowired;6 importorg.springframework.data.redis.core.RedisTemplate;7 importorg.springframework.stereotype.Service;8
9 /**
10 * 链接redis11 * 实现list lpush和rpop12 * Created by wj on 2017/8/30.13 */
14
15
16 @Service17 public classRedisRepository {18 private static final Logger log = LoggerFactory.getLogger(RedisRepository.class);19
20 @Autowired21 private RedisTemplateredisTemplate;22
23 //----------------String-----------------------
24 public voidsetKey(String key,String value){25 redisTemplate.opsForValue().set(key, value);26 }27
28
29 //----------------list----------------------
30 public Long lpush(String key, String val) throwsException{31 log.info("UDP Msg保存至redis中,key:" + key + ",val:" +val);32 returnredisTemplate.opsForList().leftPush(key, val);33 }34
35 public String rpop(String key) throwsException {36 returnredisTemplate.opsForList().rightPop(key);37 }38
39 }
使用springframework框架中的RedisTemplate类去链接redis,此处是将收到的UDP消息左保存(lpush)至list链表中,然后右边弹出(rpop)。
UdpRepository.java
1 packagecom.example.demo.repository.mysql;2
3 importcom.example.demo.mod.UdpRecord;4 importorg.springframework.data.jpa.repository.JpaRepository;5
6 /**
7 * Created by wj on 2017/8/31.8 */
9 public interface UdpRepository extends JpaRepository{10
11 }
定义Spring Data JPA接口,链接数据库。
其中
UdpRecord.java
1 packagecom.example.demo.mod;2
3 importjavax.persistence.Entity;4 importjavax.persistence.GeneratedValue;5 importjavax.persistence.Id;6 importjavax.persistence.Table;7 importjava.sql.Timestamp;8
9 /**
10 * Created by wj on 2017/8/31.11 *12 * 用来记录接收的UDP消息的日志13 */
14 @Entity15 @Table16 public classUdpRecord {17
18 private longid;19 privateString udpMsg;20 privateTimestamp time;21
22 @Id23 @GeneratedValue24 public longgetId() {25 returnid;26 }27
28 public void setId(longid) {29 this.id =id;30 }31
32 publicString getUdpMsg() {33 returnudpMsg;34 }35
36 public voidsetUdpMsg(String udpMsg) {37 this.udpMsg =udpMsg;38 }39
40 publicTimestamp getTime() {41 returntime;42 }43
44 public voidsetTime(Timestamp time) {45 this.time =time;46 }47 }
注解@Entity和@Table辨明这是一个实体类表格 ,其中的@Id和@GeneratedValue表明id是key值并且是自动递增的。
9)线程池的相关信息
TaskExecutePool.java
1 packagecom.example.demo.thread;2
3 importcom.example.demo.init.SysConfig;4 importorg.springframework.beans.factory.annotation.Autowired;5 importorg.springframework.context.annotation.Bean;6 importorg.springframework.context.annotation.Configuration;7 importorg.springframework.scheduling.annotation.EnableAsync;8 importorg.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;9
10 importjava.util.concurrent.Executor;11 importjava.util.concurrent.ThreadPoolExecutor;12
13 /**
14 * Created by wangjian on 2017/8/29.15 */
16 @Configuration17 @EnableAsync18 public classTaskExecutePool {19
20 @Autowired21 privateSysConfig config;22
23 @Bean24 publicExecutor myTaskAsyncPool() {25 ThreadPoolTaskExecutor executor = newThreadPoolTaskExecutor();26 executor.setCorePoolSize(config.getCorePoolSize());27 executor.setMaxPoolSize(config.getMaxPoolSize());28 executor.setQueueCapacity(config.getQueueCapacity());29 executor.setKeepAliveSeconds(config.getKeepAliveSeconds());30 executor.setThreadNamePrefix("MyExecutor-");31
32 //rejection-policy:当pool已经达到max size的时候,如何处理新任务33 //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
34 executor.setRejectedExecutionHandler(newThreadPoolExecutor.CallerRunsPolicy());35 executor.initialize();36 returnexecutor;37 }38 }
10)配置文件SysConfig.java
1 packagecom.example.demo.init;2
3 importorg.springframework.boot.context.properties.ConfigurationProperties;4 importorg.springframework.stereotype.Component;5
6 /**
7 * Created by wj on 2017/8/30.8 */
9 @Component10 @ConfigurationProperties(prefix="sysfig")11 public classSysConfig {12 private int UdpReceivePort;//UDP消息接收端口13
14 //线程池信息
15 private intCorePoolSize;16
17 private intMaxPoolSize;18
19 private intKeepAliveSeconds;20
21 private intQueueCapacity;22
23 public intgetCorePoolSize() {24 returnCorePoolSize;25 }26
27 public void setCorePoolSize(intcorePoolSize) {28 CorePoolSize =corePoolSize;29 }30
31 public intgetMaxPoolSize() {32 returnMaxPoolSize;33 }34
35 public void setMaxPoolSize(intmaxPoolSize) {36 MaxPoolSize =maxPoolSize;37 }38
39 public intgetKeepAliveSeconds() {40 returnKeepAliveSeconds;41 }42
43 public void setKeepAliveSeconds(intkeepAliveSeconds) {44 KeepAliveSeconds =keepAliveSeconds;45 }46
47 public intgetQueueCapacity() {48 returnQueueCapacity;49 }50
51 public void setQueueCapacity(intqueueCapacity) {52 QueueCapacity =queueCapacity;53 }54
55 public intgetUdpReceivePort() {56 returnUdpReceivePort;57 }58
59 public void setUdpReceivePort(intudpReceivePort) {60 UdpReceivePort =udpReceivePort;61 }62 }
11)小结
其实发送UDP和接收UDP消息的核心代码很简单,只是netty框架将其包装了。
UDP发送消息是
1 byte[] buffer =...2 InetAddress address = InetAddress.getByName("localhost");3
4 DatagramPacket packet = newDatagramPacket(5 buffer, buffer.length, address, 9999);6 DatagramSocket datagramSocket = newDatagramSocket();7 datagramSocket.send(packet);
udp接收消息是
1 DatagramSocket datagramSocket = new DatagramSocket(9999);2
3 byte[] buffer =....4 DatagramPacket packet = newDatagramPacket(buffer, buffer.length);5
6 datagramSocket.receive(packet);
看起来是不是很简单???
12)源代码下载地址
这里只公布了一个框架,其他很多部分由于涉及到了工作内容不便公布。
有需要的同学可以自行下载对其代码进行更改。
mysql udp服务器_netty学习:UDP服务器与Spring整合(2)相关推荐
- Struts2学习笔记——Struts2与Spring整合
Struts2与Spring整合后,可以使用Spring的配置文件applicationContext.xml来描述依赖关系,在Struts2的配置文件struts.xml来使用Spring创建的be ...
- spring boot mybatis 整合_MyBatis学习:MyBatis和Spring整合
1. 整合的工程结构 首先我们来看下整合之后的工程结构是什么样的. 2. 配置文件 在于spring整合之前,mybatis都是自己管理数据源的,然后sqlSessionFactory是我们自己去注入 ...
- Spring源码深度解析(郝佳)-学习-源码解析-Spring整合MyBatis
了解了MyBatis的单独使用过程之后,我们再来看看它也Spring整合的使用方式,比对之前的示例来找出Spring究竟为我们做了什么操作,哪些操作简化了程序开发. 准备spring71.xml &l ...
- SSM学习笔记4(Spring整合Mybatis,P26-P28,真吉尔难)
大概意思是先配置下替换下配置mapper那些XML的方式 核心对象一通我也听不懂的分析,分析出来是SqlSessionFactory.也不知道咋来的,先记下. 加MAVEN坐标忽略 配置SpringC ...
- 【Spring学习笔记 九】Spring声明式事务管理实现机制
什么是事务?事务就是把一系列的动作当成一个独立的工作单元,这些动作要么全部完成,要么全部不起作用,关乎数据准确性的地方我们一定要用到事务,防止业务逻辑出错. 什么是事务管理,事务管理对于企业应用而言至 ...
- 并发服务器设计思路,参考apache学习UDP和QoS,研究成果
研究了快1个月的服务器架构,把研究成果记录一下. 参考的有:Apache vlc ACE ftp 我主要需要其中的并发处理,内存管理,TCP/UDP.QoS,速度限制等方面的内容,所以着重说这几 ...
- 【Java 网络编程】UDP 服务器 客户端 通信 ( DatagramSocket | DatagramPacket | UDP 发送数据包 | UDP 接收数据包 | 端口号分配使用机制 )
文章目录 I UDP 信息发送接收原理 II UDP 发送和接收端口相同 III UDP 发送信息代码示例 IV UDP 接收信息代码示例 V UDP 服务器端代码示例 VI UDP 客户端代码示例 ...
- Linux下编写UDP/TCP版本的服务器和客户端的流程
Linux下编写UDP/TCP版本的服务器和客户端的流程 文章目录 Linux下编写UDP/TCP版本的服务器和客户端的流程 一:UDP和TCP的区别 二.UDP编写服务器的步骤 三.UDP编写客户端 ...
- go语言:200行代码做udp rtp转发并发分布服务器
使用go做服务器 最近写服务器使用c++多了以后,java和node逐渐被放到一边,最后又做了一个决定,使用go来做服务器,将会使用200行代码不到来做这个并发和分布式服务器,为什么? go语言的优势 ...
最新文章
- apache workprefork
- 每个人都是生活的导演
- 改了两天的bug,一个JWT解决了。。。
- maven 执行testng.xml文件失败解决问题
- 一个合理的生产环境的 Web 应用程序应该是什么样子的
- Ubuntu20.04 安装在U盘上
- MATLAB符号运算——积分
- struts1 和struts 2区别
- 【语篇标记练习题】Dismissing what was said
- 计算机网络教程 笔记整理
- 中南大学计算机学院复试2021,中南大学2021年硕士研究生拟录取名单汇总
- 杂篇:随笔编程杂谈录--《隆中对》
- c语言P0=0x是什么意思,0x80(单片机0x80什么意思)
- Ceph新长支持稳定版本Luminous(12.x.x)新功能总结
- winform模拟登陆网页_用c#模拟手机app向网站登录,如何写?
- PHP“垂死挣扎”的十年!
- Lucene DocValues详解
- 故宫博物院首次复原“天灯”“万寿灯”迎己亥春节
- 【python 爬虫】人人视频 API 接口解析
- 解析 static auto x = []() { std::ios::sync_with_stdio(false);std::cin.tie(nullptr);return 0;}()
热门文章
- c语言课程设计2018,C语言课程设计报告(2018)——学生管理系统(17页)-原创力文档...
- Spring基于Annotation实现事务管理
- C语言数组元素的查询
- 采用我国国产处理器的超级计算机是,“中国芯”超级电脑合肥诞生 首次采用国产CPU芯片...
- java算法概述,Java数据结构与算法基础(一)概述与线性结构
- 【OpenCV 例程200篇】81. 频率域高斯低通滤波器
- springboot 集成redis_一文详解Spring Boot 集成 Redis
- django后端用websocket传输数据
- 探索测试 | 新奇深层测试策略之案例剖析(一)
- mysql用代码建表基础语法