Grpc系列二 Grpc4种服务方法的定义和实现
1. 概述
1.1 服务定义
向其它的RPC服务一样,GPRC的基础是服务的定义。服务定义远程调用方法的名称、传入参数和返回参数。GRPC默认使用 Protobuf描述服务,protobuf的信息见这篇博客Protobuf3 的第一个Java demo
GRPC一共定义4种服务方法:
- 一元RPC(Unary RPCs ):这是最简单的定义,客户端发送一个请求,服务端返回一个结果
- 服务器流RPC(Server streaming RPCs):客户端发送一个请求,服务端返回一个流给客户端,客户从流中读取一系列消息,直到读取所有小心
- 客户端流RPC(Client streaming RPCs ):客户端通过流向服务端发送一系列消息,然后等待服务端读取完数据并返回处理结果
双向流RPC(Bidirectional streaming RPCs):客户端和服务端都可以独立向对方发送或接受一系列的消息。客户端和服务端读写的顺序是任意。
以上的服务方法定义在proto文件,如下:
syntax = "proto3";option java_multiple_files = true;
option java_package = "com.hry.spring.grpc.mystream";
option java_outer_classname = "HelloStreamEntity";service HelloStream {// A Unary RPC.rpc simpleRpc(Simple) returns (SimpleFeature) {}// A server-to-client streaming RPC.rpc server2ClientRpc(SimpleList) returns (stream SimpleFeature) {}// A client-to-server streaming RPC.rpc client2ServerRpc(stream Simple) returns (SimpleSummary) {}// A Bidirectional streaming RPC.rpc bindirectionalStreamRpc(stream Simple) returns (stream Simple) {}
}message Simple {int32 num = 1;string name = 2;
}message SimpleList {repeated Simple simpleList = 1;
}message SimpleFeature {string name = 1;Simple location = 2;
}message SimpleSummary {int32 feature_count = 2;
}// 测试类
message SimpleFeatureDatabase {repeated SimpleFeature feature = 1;
}
1.2 同步RPC和异步RPC
GRPC 同时支持同步RPC和异步RPC。
同步RPC调用服务方法只支持流RPC(Server streaming RPCs)和一元RPC(Unary RPCs )。异步RPC调用服务方法支持4种方法。
1.3 生成基础代码 ###
参考的Grpc系列一 第一个hello world 例子生成protobuf基础类和HelloStreamGrpc,详细见这里代码
2. 测试代码
这里实现服务端和客户端代码
2.1 服务端
服务端类的实现,通过继承HelloStreamGrpc.HelloStreamImplBase实现,具体服务接口实现见下面:
/*** 服务端类的实现**/private static class HelloStreamService extends HelloStreamGrpc.HelloStreamImplBase {private final List<SimpleFeature> features;public HelloStreamService(List<SimpleFeature> features) {this.features = features;}...
- 一元RPC(Unary RPCs )服务端实现:
@Overridepublic void simpleRpc(Simple request, StreamObserver<SimpleFeature> responseObserver) {SimpleFeature rtn = SimpleFeature.newBuilder().setName(request.getName() + "simpleRpc").setLocation(request).build();logger.info("recevier simpleRpc : {}", request);responseObserver.onNext(rtn);responseObserver.onCompleted();}
- 服务器流RPC(Server streaming RPCs)服务端实现:
@Overridepublic void server2ClientRpc(SimpleList request, StreamObserver<SimpleFeature> responseObserver) {logger.info("recevier server2ClientRpc : {}", request);for (SimpleFeature feature : this.features) {Simple simpleLocation = feature.getLocation();for (Simple o : request.getSimpleListList()) {if (o.getNum() == simpleLocation.getNum()) {// 推送记录responseObserver.onNext(feature);}}}responseObserver.onCompleted();}
- 客户端流RPC(Client streaming RPCs )服务端实现:
/*** 接收完所有的请求后,才返回一个对象*/@Overridepublic StreamObserver<Simple> client2ServerRpc(StreamObserver<SimpleSummary> responseObserver) {return new StreamObserver<Simple>() {int feature_count = 0;@Overridepublic void onNext(Simple value) {// 接收请求logger.info("num={}, client2ServerRpc, content={} ", feature_count, value);feature_count++;}@Overridepublic void onError(Throwable t) {logger.error("Simple cancelled, e={}", t);}@Overridepublic void onCompleted() {logger.info("onCompleted");// 接收所有请求后,返回总数SimpleSummary summary = SimpleSummary.newBuilder().setFeatureCount(feature_count).build();responseObserver.onNext(summary);// 结束请求responseObserver.onCompleted();}};}
- 双向流RPC(Bidirectional streaming RPCs)服务端实现:
/*** 每接收一个请求,立即返回一个对象*/@Overridepublic StreamObserver<Simple> bindirectionalStreamRpc(StreamObserver<Simple> responseObserver) {return new StreamObserver<Simple>() {@Overridepublic void onNext(Simple value) {logger.info("bindirectionalStreamRpc receive {}", value);for (SimpleFeature feature : features) {Simple simpleLocation = feature.getLocation();if (value.getNum() == simpleLocation.getNum()) {// 接收请求后,马上推送记录Simple rtn = Simple.newBuilder().setName(feature.getName() + "rtn").setNum(feature.getLocation().getNum()).build();responseObserver.onNext(rtn);}}}@Overridepublic void onError(Throwable t) {logger.error("bindirectionalStreamRpc cancelled, e={}", t);}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}
启动服务端main方法:
private static final Logger logger = LoggerFactory.getLogger(HelloStreamServer.class);private final int port;private final Server server;public HelloStreamServer(int port) throws IOException {this.port = port;this.server = ServerBuilder.forPort(port).addService(new HelloStreamService(HelloUtil.parseFeatures())).build();}// 启动服务public void start() throws IOException {server.start();logger.info("Server started, listening on " + port);Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {System.err.println("*** shutting down gRPC server since JVM is shutting down");HelloStreamServer.this.stop();System.err.println("*** server shut down");}});}// 启动服务public void stop() {if (server != null) {server.shutdown();}}/*** Await termination on the main thread since the grpc library uses daemon* threads.*/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}public static void main(String[] args) throws Exception {java.util.logging.Logger.getGlobal().setLevel(java.util.logging.Level.OFF);HelloStreamServer server = new HelloStreamServer(8980);server.start();server.blockUntilShutdown();}
2.2 客户端
客户端实现代码:创建客户端stub类
HelloStreamBlockingStub blockingStub:阻塞客户端,支持简单一元服务和流输出调用服务
HelloStreamStub asyncStub:异步客户端,支持所有类型调用
public class HelloStreamClient {private static final Logger logger = LoggerFactory.getLogger(HelloStreamClient.class);private final ManagedChannel channel;private final HelloStreamBlockingStub blockingStub;private final HelloStreamStub asyncStub;private Random random = new Random();public HelloStreamClient(String host, int port) {ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true);channel = channelBuilder.build();// 创建一个阻塞客户端,支持简单一元服务和流输出调用服务blockingStub = HelloStreamGrpc.newBlockingStub(channel);// 创建一个异步客户端,支持所有类型调用asyncStub = HelloStreamGrpc.newStub(channel);}public void shutdown() throws InterruptedException {channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);}......// 创建Simple对象private Simple newSimple(int num) {return Simple.newBuilder().setName("simple" + num).setNum(num).build();}}
- 一元RPC(Unary RPCs )客户端实现:
/*** 一元服务调用*/public void simpleRpc(int num) {logger.info("request simpleRpc: num={}", num);Simple simple = Simple.newBuilder().setName("simpleRpc").setNum(num).build();SimpleFeature feature;try {feature = blockingStub.simpleRpc(simple);} catch (StatusRuntimeException e) {logger.info("RPC failed: {0}", e.getStatus());return;}logger.info("simpleRpc end called {}", feature);}
- 服务器流RPC(Server streaming RPCs)客户端实现:
/*** 阻塞服务器流*/public void server2ClientRpc(int num1, int num2) {logger.info("request server2ClientRpc num1={}, num2={}", num1, num2);Simple simple = Simple.newBuilder().setName("simple" + num1).setNum(num1).build();Simple simple2 = Simple.newBuilder().setName("simple" + num2).setNum(num2).build();SimpleList simpleList = SimpleList.newBuilder().addSimpleList(simple).addSimpleList(simple2).build();Iterator<SimpleFeature> simpleFeatureIter = blockingStub.server2ClientRpc(simpleList);for (int i = 1; simpleFeatureIter.hasNext(); i++) {SimpleFeature feature = simpleFeatureIter.next();logger.info("Result {} : {}", i, feature);}}
- 客户端流RPC(Client streaming RPCs )客户端实现:
/*** 异步客户端流* */public void client2ServerRpc(int count) throws InterruptedException {logger.info("request client2ServerRpc {}", count);final CountDownLatch finishLatch = new CountDownLatch(1);StreamObserver<SimpleSummary> responseObserver = new StreamObserver<SimpleSummary>() {@Overridepublic void onNext(SimpleSummary value) {// 返回SimpleSummarylogger.info("client2ServerRpc onNext : {}", value);}@Overridepublic void onError(Throwable t) {logger.error("client2ServerRpc error : {}", t);finishLatch.countDown();}@Overridepublic void onCompleted() {logger.error("client2ServerRpc finish");finishLatch.countDown();}};StreamObserver<Simple> requestObserver = asyncStub.client2ServerRpc(responseObserver);try {for (int i = 0; i < count; i++) {logger.info("simple : {}", i);Simple simple = Simple.newBuilder().setName("client2ServerRpc" + i).setNum(i).build();requestObserver.onNext(simple);Thread.sleep(random.nextInt(200) + 50);}} catch (RuntimeException e) {// Cancel RPCrequestObserver.onError(e);throw e;}// 结束请求requestObserver.onCompleted();// Receiving happens asynchronouslyif (!finishLatch.await(1, TimeUnit.MINUTES)) {logger.error("client2ServerRpc can not finish within 1 minutes");}}
- 双向流RPC(Bidirectional streaming RPCs)客户端实现:
/*** 双向流* * @throws InterruptedException*/public void bindirectionalStreamRpc() throws InterruptedException {logger.info("request bindirectionalStreamRpc");final CountDownLatch finishLatch = new CountDownLatch(1);StreamObserver<Simple> requestObserver = asyncStub.bindirectionalStreamRpc(new StreamObserver<Simple>() {@Overridepublic void onNext(Simple value) {logger.info("bindirectionalStreamRpc receive message : {}", value);}@Overridepublic void onError(Throwable t) {logger.error("bindirectionalStreamRpc Failed: {0}", Status.fromThrowable(t));finishLatch.countDown();}@Overridepublic void onCompleted() {logger.info("Finished bindirectionalStreamRpc");finishLatch.countDown();}});try {Simple[] requests = { newSimple(1), newSimple(2), newSimple(3), newSimple(4) };for (Simple request : requests) {logger.info("Sending message {}", request);requestObserver.onNext(request);}} catch (RuntimeException e) {// Cancel RPCrequestObserver.onError(e);throw e;}requestObserver.onCompleted();if (!finishLatch.await(1, TimeUnit.MINUTES)) {logger.error("routeChat can not finish within 1 minutes");}}
启动客户端
public static void main(String[] args) throws InterruptedException {HelloStreamClient client = new HelloStreamClient("localhost", 8980);try {// simple rpcclient.simpleRpc(1);// server2ClientRpcclient.server2ClientRpc(1, 2);// client2ServerRpcclient.client2ServerRpc(2);// bindirectionalStreamRpcclient.bindirectionalStreamRpc();} finally {client.shutdown();}}
3. 代码
详细代码见这里Github
Grpc系列二 Grpc4种服务方法的定义和实现相关推荐
- android 原生开发 3d地图 下载_arcgis api 3.x for js 入门开发系列二不同地图服务展示(附源码下载)...
前言 关于本篇功能实现用到的 api 涉及类看不懂的,请参照 esri 官网的 arcgis api 3.x for js:esri 官网 api,里面详细的介绍 arcgis api 3.x 各个类 ...
- 【指数编制系列二】数据标准化方法
在系统学习指数编制方法之前,先介绍一下几个指数编制过程中会经常使用的数据处理方法,如:数据标准化方法.权重设置方法.异常值处理方法.因为在后面指数编制过程中会经常用到这些方法.接下来我还是按照分类 ...
- Kafka系列二——消息发送send方法源码分析
文章目录 一.send使用说明 1.1 客户端代码 1.2 ProducerRecord 二.发送过程 2.1 send 2.2 doSend关键代码 2.2.1 RecordAccumulator原 ...
- arcgis api 3.x for js 入门开发系列二不同地图服务展示(附源码下载)
前言 关于本篇功能实现用到的 api 涉及类看不懂的,请参照 esri 官网的 arcgis api 3.x for js:esri 官网 api,里面详细的介绍 arcgis api 3.x 各个类 ...
- gRPC系列(三) 如何借助HTTP2实现传输
本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三)如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...
- gRPC 基础(二)-- Go 语言版 gRPC-Go
gRPC-Go Github gRPC的Go实现:一个高性能.开源.通用的RPC框架,将移动和HTTP/2放在首位.有关更多信息,请参阅Go gRPC文档,或直接进入快速入门. 一.快速入门 本指南通 ...
- grpc原理及四种实现方式
文章目录 1. rpc概述 1.1 rpc和http区别 2. grpc介绍 调用过程 2.1. 使用原理 2.2 服务定义 2.3. 同步与异步 3. rpc的种类 3.1 一元 RPC 3.2 服 ...
- css垂直居中最常用的八种布局方法
css垂直居中最常用的八种布局方法 首先定义两个盒子,然后进行下布局样式操作! 利用CSS进行元素的水平居中,比较简单,行级元素设置其父元素的text-align center 块级元素设置其本身的l ...
- 微服务架构系列二:密码强度评测的实现与实验
本文是继<微服务架构系列一:关键技术与原理研究>的后续,系列一中论述了微服务研究的背景和意义,主要调研了传统架构的发展以及存在的问题和微服务架构的由来,然后针对微服务架构的设计原则.容器技 ...
最新文章
- 朴素高精度乘法的常数优化
- More Effective C++:理解new和delete
- python集合中的元素不允许重复对吗_python字典中的值为什么不允许重复
- mysql报错注入_关于Mysql注入过程中的三种报错方式
- linux ssh命令 带密码,[命令] Linux 命令 sshpass(密码非交互式 ssh)(转载)
- itext 添加空格_借助 iText 用代码在 PDF 中创建空白签名域
- html在线压缩tar.gz源码,c50_melp.tar.gz美国2400语音压缩编码算法,文件…
- C运行库和C语言函数库/Microsoft C运行库
- 什么时候加上android.intent.category.DEFAULT和LAUNCHER
- Redis和Memcached:数据类型 过期策略 持久策略 虚拟内存 Value大小
- Maven解决jar包版本冲突
- Rider 2021.3 Beta 现已推出
- php v-for=,Vue中v-for循环节点的实现代码
- 8. String to Integer[M]字符串转整数
- errgroup 分析
- JavaSE基础——网络编程
- 什么是Pagerank?Pagerank算法介绍与计算公式
- 2022爱分析· 工业互联网厂商全景报告
- 主数据同步与分发实现
- 基于Python的宋词生成器
热门文章
- python numpy中fromfile函数的使用
- alipay 网站支付
- 基于JavaEE的IT威客网站管理系统_JSP网站设计_SqlServer数据库设计
- eclipse自动关闭解决方案
- SpringBoot2+MybatisPlus3入门手册v2修订版
- 全球最年长程序员,84岁硬核老奶奶的励志编程路
- 模拟如何渲染100000条数据
- 在同一Android应用程序内,信息安全技术题库:Android中同一个应用程序的所有进程可以属于不同用户。()...
- 最新《IT营Angular5 Angular4.X入门实战》
- 补充:爬虫技术成就了这些商业公司的