Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.
实际情况是: 公司需要开发一个接口给新产品使用,需求如下
1.有一款硬件设备,客户用usb接上电脑就可以,但是此设备功能比较单一,所以开发一个服务器程序,辅助此设备业务功能
2.解决方案,使用Socket调用此设备
3.增强此设备功能,增加Socket客户端连接到Socket服务端
4.Http请求,同步响应
测试注意:
1.nettyServer 在ubuntu下编码,使用Epoll
2.Http请求的测试最好运行再Linux 下进行,因为Windows 可能会因为并发高的时候占满端口限制,HttpClient或者RestTemplate 请求不了.
3.ProtoBuf 插件无论再Windows,还是linux同样适用,在linux 下,会自动下载 protoc-3.5.1-linux-x86_64.exe
简单的流程如下
解决方案:
1.使用Netty框架
2.使用ProtoBuf,配合Netty 对ProtoBuf解决半包问题
3.Future 实现伪同步响应
4.SpringBoot + jetty
pom.xml 添加ProtoBuf依赖以及插件
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><grpc.version>1.11.0</grpc.version><protobuf.version>3.5.1</protobuf.version></properties>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>com.googlecode.protobuf-java-format</groupId><artifactId>protobuf-java-format</artifactId><version>1.4</version></dependency>
插件
<build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.5.1</version><configuration><protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}}:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build>
屏蔽Tomcat 使用 Jetty
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jetty</artifactId></dependency>
编写proto:再/src/main中创建文件夹 proto,创建一个Message.proto
文件内容
syntax = "proto3"; option java_package = "com.lzw.netty"; option java_outer_classname = "MessageProto"; message Message {int32 type = 1;sfixed64 id = 2;string msgBody = 3;enum Type {ACTIVE = 0;MESSAGE = 1;}}
生成java 文件
文件目录,挪到自己需要的包下面
服务端代码
/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/ public class EchoServer {//缓存ResponseFuturepublic static Map<Long, ResponseFuture<MessageProto.Message>> responseFutureMap = new HashMap<>();private final int port;public EchoServer(int port) {this.port = port;}public void start() throws InterruptedException {EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(EpollServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new MyServerChannelInitializer());try {ChannelFuture f = bootstrap.bind().sync();//清理不可预知而失败的脏数据f.channel().eventLoop().scheduleAtFixedRate(() -> {long nowTime = System.currentTimeMillis();responseFutureMap.entrySet().stream().filter(e -> (nowTime - e.getValue().getBeginTime()) > 60000).map(e -> e.getKey()).forEach(k->responseFutureMap.remove(k));}, 300, 300, TimeUnit.SECONDS);f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}} }
ContextHelper缓存ChannelHandlerContext
/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description: 缓存客户端的ChannelHandlerContext*/ public class ContextHelper {private final static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();public static Map<String, ChannelHandlerContext> getClientMap() {return Collections.unmodifiableMap(clientMap);}public static ChannelHandlerContext get(String id){return clientMap.get(id);}public static void add(String id, ChannelHandlerContext ctx) {clientMap.put(id, ctx);}public static void remove(String id) {clientMap.remove(id);} }
MyServerHandler
/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/ @Slf4j @ChannelHandler.Sharable public class MyServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {String message = msg.getMsgBody();if ((MessageProto.Message.Type.ACTIVE_VALUE) == msg.getType()) {Attribute<String> attribute = channelHandlerContext.channel().attr(AttributeKey.valueOf("userName"));//连接上以后获取消息参数,设置到channelAttrString userName = message.split(":")[1];attribute.setIfAbsent(userName);//缓存channelHandlerContext ContextHelper.add(userName, channelHandlerContext);} else if (MessageProto.Message.Type.MESSAGE_VALUE == msg.getType()) {ResponseFuture<MessageProto.Message> resutl = EchoServer.responseFutureMap.get(msg.getId());if (resutl == null)log.warn("result is null ! msgId:" + msg.getId());MessageProto.Message message1 = MessageProto.Message.newBuilder().setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody("接收成功!msg:" + message).build();resutl.setResponse(message1);} // System.out.println("Client->Server:" + channelHandlerContext.channel().remoteAddress() + " send " + msg.getMsgBody()); }@Overridepublic void channelInactive(ChannelHandlerContext ctx){Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));ContextHelper.remove(attribute.get());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();} }
ChannelInitializer,添加 Netty 支持 ProtoBuf 的拆包处理,以及编码解码
/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/ public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new MyServerHandler());}}
ResponseFuture
@NoArgsConstructor public class ResponseFuture<T> implements Future<T> {// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。private CountDownLatch latch = new CountDownLatch(1);// 响应结果private T response;// Futrue的请求时间,用于计算Future是否超时private long beginTime = System.currentTimeMillis();@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return false;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {if (response != null)return true;return false;}// 获取响应结果,直到有结果才返回。 @Overridepublic T get() throws InterruptedException {latch.await();return this.response;}// 获取响应结果,直到有结果或者超过指定时间就返回。 @Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException {if (latch.await(timeout, unit))return this.response;return null;}// 用于设置响应结果,并且做countDown操作,通知请求线程public void setResponse(T response) {this.response = response;latch.countDown();}public long getBeginTime() {return beginTime;} }
ApplicationStartup SpringBoot 完全启动以后,运行Netty服务
/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/ @Component public class ApplicationStartup implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {new EchoServer(5000).start();} }
客户端 EchoClient
/*** User: laizhenwei* Date: 2018-03-27 Time: 21:50* Description:*/ public class EchoClient {private final String host;private final int port;public EchoClient(String host,int port){this.host = host;this.port = port;}public void start(String userName) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host,port)).handler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel socketChannel){socketChannel.attr(AttributeKey.valueOf("userName")).setIfAbsent(userName);socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new MyClientHandler());}});try {ChannelFuture f = b.connect().sync();f.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}public static void main(String[] args){threadRun("Athos");threadRun("Nero");threadRun("Dante");threadRun("Vergil");threadRun("lzw");threadRun("Churchill");threadRun("Peter");threadRun("Bob");}private static void threadRun(String userName){new Thread(()-> {try {new EchoClient("192.168.1.8",5000).start(userName);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}
MyClientHandler
/*** User: laizhenwei* Date: 2018-04-09 Time: 11:20* Description:*/ @ChannelHandler.Sharable public class MyClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overridepublic void channelActive(ChannelHandlerContext ctx) {Attribute<Object> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));String m = "userName:" + attribute.get();MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();builder.setType(MessageProto.Message.Type.ACTIVE_VALUE).setMsgBody(m);ctx.writeAndFlush(builder.build());}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();//把接收到的消息写回到服务端 builder.setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody(msg.getMsgBody());channelHandlerContext.channel().writeAndFlush(builder.build());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}
JunitTest
@Testpublic void testRest() throws InterruptedException {final Gson gson = new Gson();AtomicLong atomicLong = new AtomicLong(0);ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(50);executor.setMaxPoolSize(50);executor.setQueueCapacity(512);executor.setThreadNamePrefix("Executor-");executor.setAllowCoreThreadTimeOut(false);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();String[] userNames = {"Athos", "Nero", "Dante", "Vergil", "lzw", "Churchill", "Peter", "Bob"};// String[] userNames = {"Athos"}; RestTemplate restTemplate = new RestTemplate();HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.setAccept(Arrays.asList(MediaType.APPLICATION_JSON_UTF8));httpHeaders.add("connection", "keep-alive"); // httpHeaders.setConnection("close");List<CompletableFuture<Boolean>> futures = new ArrayList<>();long begin = System.nanoTime();Arrays.stream(userNames).forEach(userName -> new Thread(() -> {for (int i = 0; i < 100000; i++) {futures.add(CompletableFuture.supplyAsync(() -> {long currentId = atomicLong.getAndIncrement();MultiValueMap<String, String> params = new LinkedMultiValueMap<>();params.add("userName", userName);params.add("msg", "你好啊!" + currentId);HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, httpHeaders);String response = restTemplate.postForObject("http://192.168.91.130:8010/process", httpEntity, String.class);if (response != null) {Map<String, Object> responseMap;responseMap = gson.fromJson(response, HashMap.class);return responseMap.get("msgBody").equals("接收成功!msg:你好啊!" + currentId);}return false;}, executor));}}).start());while(futures.size()!=(100000*userNames.length)){TimeUnit.MILLISECONDS.sleep(500);}List<Boolean> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println((System.nanoTime() - begin) / 1000000);result.stream().filter(r -> !r).forEach(r -> System.out.println(r));}
1.启动NettyServer
2.启动NettyClient
3.启动N个JunitTest windows 启动5个,Linux 启动5个
看看server输出,从请求到响应非常迅速
Client 多个线程也没有看到输出有false,证明伪同步响应成功
转载于:https://www.cnblogs.com/sweetchildomine/p/8798493.html
Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.相关推荐
- Netty粘包/半包问题解析
目录 一.什么是粘包/半包问题 二.TCP粘包/半包发生的原因 三.粘包/半包解决办法 四.Netty中粘包/半包解决示例 1. 采用固定长度数据包编解码方式 2. 采用特殊字符作为边界字符编解码方式 ...
- 三、Netty的粘包半包问题解决
一.定义 TCP 传输中,客户端发送数据,实际是把数据写入到了 TCP 的缓存中,粘包和半包也就会在此时产生.客户端给服务端发送了两条消息ABC和DEF,服务端这边的接收会有多少种情况呢?有可能是一次 ...
- Netty框架之TCP粘包/半包解决方案
Netty框架之TCP粘包/半包解决方案 一.TCP粘包 二.TCP半包 三.TCP粘包/半包解决方案 1.FixedLengthFrameDecoder定长解析器 2.LineBasedFrameD ...
- netty——黏包半包的解决方案、滑动窗口的概念
黏包半包 滑动窗口 在深入理解黏包半包问题之前,先了解TCP的一个知识点--滑动窗口 我们都指定tcp是一种可靠的传输协议,这主要是因为在tcp中客户端给服务器端发送一条消息,要等待服务器端的应答,如 ...
- TCP 粘包半包 netty 编解码 三者关系
1 何为粘包 / 半包? 对方一次性接收了多条消息这种现象,我们就称之为 粘包现象. 对方多次接收了不完整消息这种现象,我们就称之为 半包现象. 粘包的原因: 发送方发送的消息 < 缓冲区大小 ...
- Netty如何解决粘包半包问题
何为粘包 / 半包? 比如,我们发送两条消息:ABC 和 DEF,那么对方收到的就一定是 ABC 和 DEF 吗? 不一定,对方可能一次就把两条消息接收完了,即 ABCDEF:也可能分成了好多次,比如 ...
- Netty(一)基础socketchannel,Buffer,selector黏包 半包解决 实战
NIO 基础 non-blockiong io:非阻塞 阻塞vs非阻塞 三大组件 1.channel & Buffer channel : 双向流 Buffer:暂存数据 缓冲区 常见chan ...
- 04、Netty学习笔记—(黏包半包及协议设计解析)
文章目录 一.粘包与半包 1.1.现象分析 1.1.1.粘包.半包情况分析 1.1.2.滑动窗口.MSS限制.Nagle算法介绍 1.2.粘包.半包现象复现 1.2.1.粘包复现 1.2.2.半包复现 ...
- netty之黏包半包问题详解
文章目录 黏包问题 半包问题 出现问题的原因 解决方案 短连接 定长解码器 行解码器 LTC解码器 黏包问题 在客户端循环发送十次数据,服务器端会出现黏包现象. ChannelFuture futur ...
最新文章
- Java学习(16)--System 类/Date 类/ Calendar类
- MySQL 超时解决方案mysql报错处理: could not be resolved: getnameinfo() returned error (code: -3)...
- 2020 我的C++学习之路 C++PrimerPlus第十章课后习题
- 基于android的简单网页_成都APP开发:APP原生和网页开发有什么区别?
- 蔚来汽车股票一日成交额达274亿美元 居美股榜首
- vscode 网页版
- 笨方法学python 习题41
- 为什么csrss进程有三个_什么是客户端服务器运行时进程(csrss.exe),为什么在我的PC上运行它?...
- SweepProfile for 3dMax石膏线生成插件使用教程
- C#学习笔记五——选择文件、文件夹操作
- Windows10无法启动windows安全中心服务怎么办?
- oracle固定资产部门分摊,用友软件操作手册--固定资产--多部门使用、分摊处理...
- 阿里云如何购买mysql_如何选购配置阿里云数据库RDS MySQL的流程 新手必看
- 活动倒计时HTML,活动倒计时代码(精确到毫秒)jquery插件
- DVE C++ 中批量注释快捷键 Ctrl + /
- 现实中的项目范围变更
- 90后浙理工硕士论文登Nature,毕业却考取公务员!网友:宇宙尽头是考公
- mac上卸载Anyconnect后 重新安装失败
- 上海“网红流浪汉”你站住!To B人有话对你说~~~
- 给李开复:“创新工厂”最大的问题