实际情况是: 公司需要开发一个接口给新产品使用,需求如下

1.有一款硬件设备,客户用usb接上电脑就可以,但是此设备功能比较单一,所以开发一个服务器程序,辅助此设备业务功能

2.解决方案,使用Socket调用此设备

3.增强此设备功能,增加Socket客户端连接到Socket服务端

4.Http请求,同步响应

测试注意:

1.nettyServer 在ubuntu下编码,使用Epoll

2.Http请求的测试最好运行再Linux 下进行,因为Windows 可能会因为并发高的时候占满端口限制,HttpClient或者RestTemplate 请求不了.

3.ProtoBuf 插件无论再Windows,还是linux同样适用,在linux 下,会自动下载 protoc-3.5.1-linux-x86_64.exe

简单的流程如下

解决方案:

1.使用Netty框架

2.使用ProtoBuf,配合Netty 对ProtoBuf解决半包问题

3.Future 实现伪同步响应

4.SpringBoot + jetty

pom.xml 添加ProtoBuf依赖以及插件

 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><grpc.version>1.11.0</grpc.version><protobuf.version>3.5.1</protobuf.version></properties>

  <dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>${grpc.version}</version><scope>provided</scope></dependency><dependency><groupId>com.googlecode.protobuf-java-format</groupId><artifactId>protobuf-java-format</artifactId><version>1.4</version></dependency>

插件

    <build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.5.0.Final</version></extension></extensions><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.5.1</version><configuration><protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}}:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build>

屏蔽Tomcat 使用 Jetty

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jetty</artifactId></dependency>

编写proto:再/src/main中创建文件夹 proto,创建一个Message.proto

文件内容

syntax = "proto3";
option java_package = "com.lzw.netty";
option java_outer_classname = "MessageProto";
message Message {int32 type = 1;sfixed64 id = 2;string msgBody = 3;enum Type {ACTIVE = 0;MESSAGE = 1;}}

生成java 文件

文件目录,挪到自己需要的包下面

服务端代码

/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/
public class EchoServer {//缓存ResponseFuturepublic static Map<Long, ResponseFuture<MessageProto.Message>> responseFutureMap = new HashMap<>();private final int port;public EchoServer(int port) {this.port = port;}public void start() throws InterruptedException {EventLoopGroup bossGroup = new EpollEventLoopGroup(1);EventLoopGroup workerGroup = new EpollEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(EpollServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new MyServerChannelInitializer());try {ChannelFuture f = bootstrap.bind().sync();//清理不可预知而失败的脏数据f.channel().eventLoop().scheduleAtFixedRate(() -> {long nowTime = System.currentTimeMillis();responseFutureMap.entrySet().stream().filter(e -> (nowTime - e.getValue().getBeginTime()) > 60000).map(e -> e.getKey()).forEach(k->responseFutureMap.remove(k));}, 300, 300, TimeUnit.SECONDS);f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}
}

ContextHelper缓存ChannelHandlerContext

/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description: 缓存客户端的ChannelHandlerContext*/
public class ContextHelper {private final static Map<String, ChannelHandlerContext> clientMap = new ConcurrentHashMap<>();public static Map<String, ChannelHandlerContext> getClientMap() {return Collections.unmodifiableMap(clientMap);}public static ChannelHandlerContext get(String id){return clientMap.get(id);}public static void add(String id, ChannelHandlerContext ctx) {clientMap.put(id, ctx);}public static void remove(String id) {clientMap.remove(id);}
}

MyServerHandler

/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/
@Slf4j
@ChannelHandler.Sharable
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {String message = msg.getMsgBody();if ((MessageProto.Message.Type.ACTIVE_VALUE) == msg.getType()) {Attribute<String> attribute = channelHandlerContext.channel().attr(AttributeKey.valueOf("userName"));//连接上以后获取消息参数,设置到channelAttrString userName = message.split(":")[1];attribute.setIfAbsent(userName);//缓存channelHandlerContext
            ContextHelper.add(userName, channelHandlerContext);} else if (MessageProto.Message.Type.MESSAGE_VALUE == msg.getType()) {ResponseFuture<MessageProto.Message> resutl = EchoServer.responseFutureMap.get(msg.getId());if (resutl == null)log.warn("result is null ! msgId:" + msg.getId());MessageProto.Message message1 = MessageProto.Message.newBuilder().setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody("接收成功!msg:" + message).build();resutl.setResponse(message1);}
//        System.out.println("Client->Server:" + channelHandlerContext.channel().remoteAddress() + " send " + msg.getMsgBody());
    }@Overridepublic void channelInactive(ChannelHandlerContext ctx){Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));ContextHelper.remove(attribute.get());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();}
}

ChannelInitializer,添加 Netty 支持 ProtoBuf 的拆包处理,以及编码解码

/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new MyServerHandler());}}

ResponseFuture

@NoArgsConstructor
public class ResponseFuture<T> implements Future<T> {// 因为请求和响应是一一对应的,因此初始化CountDownLatch值为1。private CountDownLatch latch = new CountDownLatch(1);// 响应结果private T response;// Futrue的请求时间,用于计算Future是否超时private long beginTime = System.currentTimeMillis();@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return false;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {if (response != null)return true;return false;}// 获取响应结果,直到有结果才返回。
    @Overridepublic T get() throws InterruptedException {latch.await();return this.response;}// 获取响应结果,直到有结果或者超过指定时间就返回。
    @Overridepublic T get(long timeout, TimeUnit unit) throws InterruptedException {if (latch.await(timeout, unit))return this.response;return null;}// 用于设置响应结果,并且做countDown操作,通知请求线程public void setResponse(T response) {this.response = response;latch.countDown();}public long getBeginTime() {return beginTime;}
}

ApplicationStartup SpringBoot 完全启动以后,运行Netty服务

/*** User: laizhenwei* Date: 2018-03-26 Time: 21:46* Description:*/
@Component
public class ApplicationStartup implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {new EchoServer(5000).start();}
}

客户端 EchoClient

/*** User: laizhenwei* Date: 2018-03-27 Time: 21:50* Description:*/
public class EchoClient {private final String host;private final int port;public EchoClient(String host,int port){this.host = host;this.port = port;}public void start(String userName) throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host,port)).handler(new ChannelInitializer<SocketChannel>(){@Overrideprotected void initChannel(SocketChannel socketChannel){socketChannel.attr(AttributeKey.valueOf("userName")).setIfAbsent(userName);socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance())).addLast(new ProtobufVarint32LengthFieldPrepender()).addLast(new ProtobufEncoder()).addLast(new MyClientHandler());}});try {ChannelFuture f = b.connect().sync();f.channel().closeFuture().sync();}finally {group.shutdownGracefully().sync();}}public static void main(String[] args){threadRun("Athos");threadRun("Nero");threadRun("Dante");threadRun("Vergil");threadRun("lzw");threadRun("Churchill");threadRun("Peter");threadRun("Bob");}private static void threadRun(String userName){new Thread(()-> {try {new EchoClient("192.168.1.8",5000).start(userName);} catch (InterruptedException e) {e.printStackTrace();}}).start();}}

MyClientHandler

/*** User: laizhenwei* Date: 2018-04-09 Time: 11:20* Description:*/
@ChannelHandler.Sharable
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProto.Message> {@Overridepublic void channelActive(ChannelHandlerContext ctx) {Attribute<Object> attribute = ctx.channel().attr(AttributeKey.valueOf("userName"));String m = "userName:" + attribute.get();MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();builder.setType(MessageProto.Message.Type.ACTIVE_VALUE).setMsgBody(m);ctx.writeAndFlush(builder.build());}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, MessageProto.Message msg) {MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();//把接收到的消息写回到服务端
        builder.setId(msg.getId()).setType(MessageProto.Message.Type.MESSAGE_VALUE).setMsgBody(msg.getMsgBody());channelHandlerContext.channel().writeAndFlush(builder.build());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}}

JunitTest

    @Testpublic void testRest() throws InterruptedException {final Gson gson = new Gson();AtomicLong atomicLong = new AtomicLong(0);ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(50);executor.setMaxPoolSize(50);executor.setQueueCapacity(512);executor.setThreadNamePrefix("Executor-");executor.setAllowCoreThreadTimeOut(false);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();String[] userNames = {"Athos", "Nero", "Dante", "Vergil", "lzw", "Churchill", "Peter", "Bob"};//        String[] userNames = {"Athos"};
RestTemplate restTemplate = new RestTemplate();HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.setAccept(Arrays.asList(MediaType.APPLICATION_JSON_UTF8));httpHeaders.add("connection", "keep-alive");
//        httpHeaders.setConnection("close");List<CompletableFuture<Boolean>> futures = new ArrayList<>();long begin = System.nanoTime();Arrays.stream(userNames).forEach(userName -> new Thread(() -> {for (int i = 0; i < 100000; i++) {futures.add(CompletableFuture.supplyAsync(() -> {long currentId = atomicLong.getAndIncrement();MultiValueMap<String, String> params = new LinkedMultiValueMap<>();params.add("userName", userName);params.add("msg", "你好啊!" + currentId);HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(params, httpHeaders);String response = restTemplate.postForObject("http://192.168.91.130:8010/process", httpEntity, String.class);if (response != null) {Map<String, Object> responseMap;responseMap = gson.fromJson(response, HashMap.class);return responseMap.get("msgBody").equals("接收成功!msg:你好啊!" + currentId);}return false;}, executor));}}).start());while(futures.size()!=(100000*userNames.length)){TimeUnit.MILLISECONDS.sleep(500);}List<Boolean> result = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println((System.nanoTime() - begin) / 1000000);result.stream().filter(r -> !r).forEach(r -> System.out.println(r));}

1.启动NettyServer

2.启动NettyClient

3.启动N个JunitTest windows 启动5个,Linux 启动5个

看看server输出,从请求到响应非常迅速

Client 多个线程也没有看到输出有false,证明伪同步响应成功

转载于:https://www.cnblogs.com/sweetchildomine/p/8798493.html

Http 调用netty 服务,服务调用客户端,伪同步响应.ProtoBuf 解决粘包,半包问题.相关推荐

  1. Netty粘包/半包问题解析

    目录 一.什么是粘包/半包问题 二.TCP粘包/半包发生的原因 三.粘包/半包解决办法 四.Netty中粘包/半包解决示例 1. 采用固定长度数据包编解码方式 2. 采用特殊字符作为边界字符编解码方式 ...

  2. 三、Netty的粘包半包问题解决

    一.定义 TCP 传输中,客户端发送数据,实际是把数据写入到了 TCP 的缓存中,粘包和半包也就会在此时产生.客户端给服务端发送了两条消息ABC和DEF,服务端这边的接收会有多少种情况呢?有可能是一次 ...

  3. Netty框架之TCP粘包/半包解决方案

    Netty框架之TCP粘包/半包解决方案 一.TCP粘包 二.TCP半包 三.TCP粘包/半包解决方案 1.FixedLengthFrameDecoder定长解析器 2.LineBasedFrameD ...

  4. netty——黏包半包的解决方案、滑动窗口的概念

    黏包半包 滑动窗口 在深入理解黏包半包问题之前,先了解TCP的一个知识点--滑动窗口 我们都指定tcp是一种可靠的传输协议,这主要是因为在tcp中客户端给服务器端发送一条消息,要等待服务器端的应答,如 ...

  5. TCP 粘包半包 netty 编解码 三者关系

    1 何为粘包 / 半包? 对方一次性接收了多条消息这种现象,我们就称之为 粘包现象. 对方多次接收了不完整消息这种现象,我们就称之为 半包现象. 粘包的原因: 发送方发送的消息 < 缓冲区大小 ...

  6. Netty如何解决粘包半包问题

    何为粘包 / 半包? 比如,我们发送两条消息:ABC 和 DEF,那么对方收到的就一定是 ABC 和 DEF 吗? 不一定,对方可能一次就把两条消息接收完了,即 ABCDEF:也可能分成了好多次,比如 ...

  7. Netty(一)基础socketchannel,Buffer,selector黏包 半包解决 实战

    NIO 基础 non-blockiong io:非阻塞 阻塞vs非阻塞 三大组件 1.channel & Buffer channel : 双向流 Buffer:暂存数据 缓冲区 常见chan ...

  8. 04、Netty学习笔记—(黏包半包及协议设计解析)

    文章目录 一.粘包与半包 1.1.现象分析 1.1.1.粘包.半包情况分析 1.1.2.滑动窗口.MSS限制.Nagle算法介绍 1.2.粘包.半包现象复现 1.2.1.粘包复现 1.2.2.半包复现 ...

  9. netty之黏包半包问题详解

    文章目录 黏包问题 半包问题 出现问题的原因 解决方案 短连接 定长解码器 行解码器 LTC解码器 黏包问题 在客户端循环发送十次数据,服务器端会出现黏包现象. ChannelFuture futur ...

最新文章

  1. Java学习(16)--System 类/Date 类/ Calendar类
  2. MySQL 超时解决方案mysql报错处理: could not be resolved: getnameinfo() returned error (code: -3)...
  3. 2020 我的C++学习之路 C++PrimerPlus第十章课后习题
  4. 基于android的简单网页_成都APP开发:APP原生和网页开发有什么区别?
  5. 蔚来汽车股票一日成交额达274亿美元 居美股榜首
  6. vscode 网页版
  7. 笨方法学python 习题41
  8. 为什么csrss进程有三个_什么是客户端服务器运行时进程(csrss.exe),为什么在我的PC上运行它?...
  9. SweepProfile for 3dMax石膏线生成插件使用教程
  10. C#学习笔记五——选择文件、文件夹操作
  11. Windows10无法启动windows安全中心服务怎么办?
  12. oracle固定资产部门分摊,用友软件操作手册--固定资产--多部门使用、分摊处理...
  13. 阿里云如何购买mysql_如何选购配置阿里云数据库RDS MySQL的流程 新手必看
  14. 活动倒计时HTML,活动倒计时代码(精确到毫秒)jquery插件
  15. DVE C++ 中批量注释快捷键 Ctrl + /
  16. 现实中的项目范围变更
  17. 90后浙理工硕士论文登Nature,毕业却考取公务员!网友:宇宙尽头是考公
  18. mac上卸载Anyconnect后 重新安装失败
  19. 上海“网红流浪汉”你站住!To B人有话对你说~~~
  20. 给李开复:“创新工厂”最大的问题

热门文章

  1. 核电站计算机专业是干什么的,什么叫核电站?它是干什么用的呢?
  2. VMware虚拟机怎么用U盘装win7系统
  3. DSPE-PEG近年来在长循环脂质体、高分子胶束等药物载体中的应用获得了较快发展
  4. 人机交互-2-交互设计的原则与方法
  5. mysql 消息队列_一个简单的 MySQL 批量事务消息队列
  6. 高通量测序的数据处理与分析指北(一)_network
  7. 推荐系统实践Task1:熟悉新闻推荐系统基本流程
  8. 面试项目经理,这12个问题一定会被问到(建议收藏)
  9. 条码软件如何添加图片
  10. GRBL学习-常用G代码