欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《java版gRPC实战》全系列链接

  1. 用proto生成代码
  2. 服务发布和调用
  3. 服务端流
  4. 客户端流
  5. 双向流
  6. 客户端动态获取服务端地址
  7. 基于eureka的注册发现

本篇概览

  • 本文是《java版gRPC实战》系列的第四篇,前文掌握了服务端流,适合从服务端获取大量数据的场景,今天的目标是掌握客户端流类型的服务,包括服务提供方和使用方两侧的开发;
  • 先来看看官方资料对客户端流式RPC的介绍:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;
  • 本文由以下几部分组成:
  1. 提前小结几个重要的知识点,稍后开发过程中要重点关注这几个地方;
  2. 在proto文件中定义客户端流类型的gRPC接口,再通过proto生成java代码;
  3. 开发服务端应用;
  4. 开发客户端应用;
  5. 验证;

提前小结

为了突出重点,这里将几个关键的知识点提前给出:

  1. 客户端流的特点,是请求方以流的形式提交数据到响应方;
  2. 一次RPC请求中,请求方可以通过流的方式源源不断的提交数据,直到调用了StreamObserver的onCompleted方法,才算提交数据完成;
  3. 平时咱们调用方法时,方法内部用到的数据是通过入参传进来的,但这里不一样,客户端要传给服务端的数据和gRPC方法的入参没有关系,而是和方法的返回对象有关(执行返回对象的onNext方法可以将数据传给服务端);
  4. 客户端在A线程上传完数据后,服务端的响应是在另一个线程B执行的,因此,如果A线程拿到服务端响应,就要B线程的异步响应方法执行完毕,等待的方法有多种,我用的是CountDownLatch;
  5. 在服务端,开发者要编写的代码和以往web开发不同,不是将数据处理好返回,而是返回一个StreamObserver实例给上层框架,由框架负责处理的逻辑,开发者专注开发StreamObserver的实现即可,例如重写onNext方法,客户端通过流每上传一笔数据,onNext方法都会被外层框架执行一次;
  6. 如果您用的是IDEA,记得勾选下图红框中的选框,否则运行应用的时候可能遇到lombok相关的问题:
  • 上面提到的这些,会在接下来的开发过程中充分体现出来;

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:
  • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在client-stream-server-side目录下,客户端代码在client-stream-client-side目录下,如下图:

在proto文件中定义客户端流类型的gRPC接口

  • 首先要做的就是定义gRPC接口,打开mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart方法的入参ProductOrder前面添加了stream修饰,代表该方法是客户端流类型:
// gRPC服务,这是个在线商城的购物车服务
service CartService {// 客户端流式:添加多个商品到购物车rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}
}// 提交购物车时的产品信息
message ProductOrder {// 商品IDint32 productId = 1;// 商品数量int32 number = 2;
}// 提交购物车返回结果的数据结构
message AddCartReply {// 返回码int32 code = 1;// 描述信息string message = 2;
}
  • 双击下图红框中的task即可生成java代码:
  • 生成下图红框中的文件:
  • 接下来开发服务端;

开发服务端应用

  • 在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件
plugins {id 'org.springframework.boot'
}dependencies {implementation 'org.projectlombok:lombok'implementation 'org.springframework.boot:spring-boot-starter'// 作为gRPC服务提供方,需要用到此库implementation 'net.devh:grpc-server-spring-boot-starter'// 依赖自动生成源码的工程implementation project(':grpc-lib')// annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessorannotationProcessor 'org.projectlombok:lombok'
}
  • 配置文件application.yml:
spring:application:name: client-stream-server-side
# gRPC有关的配置,这里只需要配置服务端口号
grpc:server:port: 9900
  • 启动类ClientStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 重点是提供grpc服务的GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了:
package com.bolingcavalry.grpctutorials;import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;@GrpcService
@Slf4j
public class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {@Overridepublic StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {// 返回匿名类,给上层框架使用return new StreamObserver<ProductOrder>() {// 记录处理产品的总量private int totalCount = 0;@Overridepublic void onNext(ProductOrder value) {log.info("正在处理商品[{}],数量为[{}]",value.getProductId(),value.getNumber());// 增加总量totalCount += value.getNumber();}@Overridepublic void onError(Throwable t) {log.error("添加购物车异常", t);}@Overridepublic void onCompleted() {log.info("添加购物车完成,共计[{}]件商品", totalCount);responseObserver.onNext(AddCartReply.newBuilder().setCode(10000).setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount)).build());responseObserver.onCompleted();}};}
}

开发客户端应用

  • 在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
plugins {id 'org.springframework.boot'
}dependencies {implementation 'org.projectlombok:lombok'implementation 'org.springframework.boot:spring-boot-starter'implementation 'org.springframework.boot:spring-boot-starter-web'implementation 'net.devh:grpc-client-spring-boot-starter'implementation project(':grpc-lib')
}
  • 配置文件application.yml,设置自己的web端口号和服务端地址:
server:port: 8082
spring:application:name: client-stream-client-sidegrpc:client:# gRPC配置的名字,GrpcClient注解会用到client-stream-server-side:# gRPC服务端地址address: 'static://127.0.0.1:9900'enableKeepAlive: truekeepAliveWithoutCalls: truenegotiationType: plaintext
  • 启动类ClientStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;
  • 正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.bolingcavalry.grpctutorials;import io.grpc.stub.StreamObserver;public interface ExtendResponseObserver<T> extends StreamObserver<T> {String getExtra();
}
  • 重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.bolingcavalry.grpctutorials;import com.bolingcavalry.grpctutorials.lib.AddCartReply;
import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;@Service
@Slf4j
public class GrpcClientService {@GrpcClient("client-stream-server-side")private CartServiceGrpc.CartServiceStub cartServiceStub;public String addToCart(int count) {CountDownLatch countDownLatch = new CountDownLatch(1);// responseObserver的onNext和onCompleted会在另一个线程中被执行,// ExtendResponseObserver继承自StreamObserverExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {String extraStr;@Overridepublic String getExtra() {return extraStr;}private int code;private String message;@Overridepublic void onNext(AddCartReply value) {log.info("on next");code = value.getCode();message = value.getMessage();}@Overridepublic void onError(Throwable t) {log.error("gRPC request error", t);extraStr = "gRPC error, " + t.getMessage();countDownLatch.countDown();}@Overridepublic void onCompleted() {log.info("on complete");extraStr = String.format("返回码[%d],返回信息:%s" , code, message);countDownLatch.countDown();}};// 远程调用,此时数据还没有给到服务端StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);for(int i=0; i<count; i++) {// 发送一笔数据到服务端requestObserver.onNext(build(101 + i, 1 + i));}// 客户端告诉服务端:数据已经发完了requestObserver.onCompleted();try {// 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,// 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,// await的超时时间设置为2秒countDownLatch.await(2, TimeUnit.SECONDS);} catch (InterruptedException e) {log.error("countDownLatch await error", e);}log.info("service finish");// 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得return responseObserver.getExtra();}/*** 创建ProductOrder对象* @param productId* @param num* @return*/private static ProductOrder build(int productId, int num) {return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();}
}
  • 最后做个web接口,可以通过web请求验证远程调用:
package com.bolingcavalry.grpctutorials;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;@RestController
public class GrpcClientController {@Autowiredprivate GrpcClientService grpcClientService;@RequestMapping("/")public String printMessage(@RequestParam(defaultValue = "1") int count) {return grpcClientService.addToCart(count);}
}
  • 编码完成,开始验证;

验证

  • 启动服务端ClientStreamServerSideApplication:
  • 启动客户端ClientStreamClientSideApplication:
  • 浏览器输入http://localhost:8082/?count=100,响应如下,可见远程调用gRPC服务成功:
  • 下面是服务端日志,可见逐一处理了客户端的每一笔数据:
  • 下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:
  • 至此,客户端流类型的gRPC服务及其客户端开发就完成了,这种异步操作与咱们平时开发同步类型的web接口还是有差别的,希望本文能给您带来一些参考,下一篇咱们实战最后一种类型:双向流式;

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…

java版gRPC实战之四:客户端流相关推荐

  1. java版gRPC实战之五:双向流

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <java版gRPC实战>全系列链接 用p ...

  2. java版gRPC实战之三:服务端流

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos <java版gRPC实战>全系列链接 用p ...

  3. java版gRPC实战之一:用proto生成代码

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 关于<java版gRPC实战>系列 < ...

  4. java版gRPC实战之五:双向流,rabbitmq持久化原理

    responseObserver.onNext(DeductReply.newBuilder() .setCode(code) .setMessage(message) .build()); } @O ...

  5. gRPC的通信方式-客户端流式、服务端流式、双向流式在Java的调用示例

    场景 gPRC简介以及Java中使用gPRC实现客户端与服务端通信(附代码下载): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/ ...

  6. java与es8实战之四:SpringBoot应用中操作es8(无安全检查)

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本篇是<java与es8实战>系 ...

  7. java与es8实战之五:SpringBoot应用中操作es8(带安全检查:https、账号密码、API Key)

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本篇是<java与es8实战>系 ...

  8. 【Golang | gRPC】gRPC-Client Streaming客户端流实战

    环境: Golang: go1.18.2 windows/amd64 grpc: v1.47.0 protobuf: v1.28.0 完整代码: https://github.com/WanshanT ...

  9. grpc java_grpc详解 java版

    grpc 详解 java版 Java中gRPC的基本教程介绍. 本教程提供了有关使用gRPC的基本Java程序员介绍. 通过遍历此示例,您将学习如何: grpc设计的核心概念. 在.proto文件中定 ...

最新文章

  1. 正确“假期休息模式”
  2. Depth-aware CNN
  3. PHP 页面静态化/纯静态化/伪静态化
  4. 如何设置mysql的权限_mysql 权限控制
  5. 远程图片保存到服务器 php,保存远程图片到本地服务器几种方法[php,asp]网
  6. 5道谷歌面试题:即使是天才也要怀疑自己能力了(附答案)
  7. 什么叫轻量瓷_为什么说陶瓷是华夏文明的徽章?
  8. C#LeetCode刷题-几何
  9. 常见python爬虫框架_常用高效的Python爬虫框架
  10. POJ3292 UVA11105 Semi-prime H-numbers【筛法打表】
  11. 兰花草c语言编码蜂鸣器,蜂鸣器奏乐-多种音乐分享
  12. protel 99se快捷键 总结
  13. android购物车计数器,Axure教程:计数器的实战应用场景 —— 商品购件数
  14. Matlab fprintf
  15. matlab绘制凸多面体,科学网—MATLAB中计算凸多边形面积和凸多面体体积的简便方法 - 王福昌的博文...
  16. 408真题-2022
  17. 英语词根词缀记忆法(全集)_掌握了英语词根词缀,就能迅速记下单词,快速提升词汇量...
  18. APP服务器被恶意攻击怎么办?
  19. 2021英国硕士计算机专业排名,2020年伦敦国王学院先进的计算机专业硕士申请条件-学费-世界排名...
  20. Auto.js调试:使用雷电模拟器的网络模式进行调试

热门文章

  1. 文化部声明:网游已经划归文化部管理
  2. MATLAB对图形做数值变换
  3. 计算机科学与技术专业大几分流,2019级计算机大类专业分流实施细则
  4. 强度测试与压力测试的理解和区别
  5. 【题解】POJ 1852 Ants(搜索)
  6. Ns2简单有限网络仿真实验
  7. 申宝正规股票探底回升巨震洗盘
  8. 【递归】两道程序题理解递归
  9. 数学分析(4): 函数的连续性
  10. 面向初学者的 SQL Server 查询执行计划(1)——聚集索引运算符(Clustered Index)