1. 概述

1.1 服务定义

向其它的RPC服务一样,GPRC的基础是服务的定义。服务定义远程调用方法的名称、传入参数和返回参数。GRPC默认使用 Protobuf描述服务,protobuf的信息见这篇博客Protobuf3 的第一个Java demo

GRPC一共定义4种服务方法:

  1. 一元RPC(Unary RPCs ):这是最简单的定义,客户端发送一个请求,服务端返回一个结果
  2. 服务器流RPC(Server streaming RPCs):客户端发送一个请求,服务端返回一个流给客户端,客户从流中读取一系列消息,直到读取所有小心
  3. 客户端流RPC(Client streaming RPCs ):客户端通过流向服务端发送一系列消息,然后等待服务端读取完数据并返回处理结果
  4. 双向流RPC(Bidirectional streaming RPCs):客户端和服务端都可以独立向对方发送或接受一系列的消息。客户端和服务端读写的顺序是任意。

    以上的服务方法定义在proto文件,如下:

syntax = "proto3";option java_multiple_files = true;
option java_package = "com.hry.spring.grpc.mystream";
option java_outer_classname = "HelloStreamEntity";service HelloStream {// A Unary RPC.rpc simpleRpc(Simple) returns (SimpleFeature) {}// A server-to-client streaming RPC.rpc server2ClientRpc(SimpleList) returns (stream SimpleFeature) {}// A client-to-server streaming RPC.rpc client2ServerRpc(stream Simple) returns (SimpleSummary) {}// A Bidirectional streaming RPC.rpc bindirectionalStreamRpc(stream Simple) returns (stream Simple) {}
}message Simple {int32 num = 1;string name = 2;
}message SimpleList {repeated Simple simpleList = 1;
}message SimpleFeature {string name = 1;Simple location = 2;
}message SimpleSummary {int32 feature_count = 2;
}// 测试类
message SimpleFeatureDatabase {repeated SimpleFeature feature = 1;
}

1.2 同步RPC和异步RPC

GRPC 同时支持同步RPC和异步RPC。
同步RPC调用服务方法只支持流RPC(Server streaming RPCs)和一元RPC(Unary RPCs )。异步RPC调用服务方法支持4种方法。

1.3 生成基础代码 ###

参考的Grpc系列一 第一个hello world 例子生成protobuf基础类和HelloStreamGrpc,详细见这里代码

2. 测试代码

这里实现服务端和客户端代码

2.1 服务端

服务端类的实现,通过继承HelloStreamGrpc.HelloStreamImplBase实现,具体服务接口实现见下面:

/*** 服务端类的实现**/private static class HelloStreamService extends HelloStreamGrpc.HelloStreamImplBase {private final List<SimpleFeature> features;public HelloStreamService(List<SimpleFeature> features) {this.features = features;}...
  • 一元RPC(Unary RPCs )服务端实现:
        @Overridepublic void simpleRpc(Simple request, StreamObserver<SimpleFeature> responseObserver) {SimpleFeature rtn = SimpleFeature.newBuilder().setName(request.getName() + "simpleRpc").setLocation(request).build();logger.info("recevier simpleRpc : {}", request);responseObserver.onNext(rtn);responseObserver.onCompleted();}
  • 服务器流RPC(Server streaming RPCs)服务端实现:
        @Overridepublic void server2ClientRpc(SimpleList request, StreamObserver<SimpleFeature> responseObserver) {logger.info("recevier server2ClientRpc : {}", request);for (SimpleFeature feature : this.features) {Simple simpleLocation = feature.getLocation();for (Simple o : request.getSimpleListList()) {if (o.getNum() == simpleLocation.getNum()) {// 推送记录responseObserver.onNext(feature);}}}responseObserver.onCompleted();}
  • 客户端流RPC(Client streaming RPCs )服务端实现:
/*** 接收完所有的请求后,才返回一个对象*/@Overridepublic StreamObserver<Simple> client2ServerRpc(StreamObserver<SimpleSummary> responseObserver) {return new StreamObserver<Simple>() {int feature_count = 0;@Overridepublic void onNext(Simple value) {// 接收请求logger.info("num={}, client2ServerRpc, content={} ", feature_count, value);feature_count++;}@Overridepublic void onError(Throwable t) {logger.error("Simple cancelled, e={}", t);}@Overridepublic void onCompleted() {logger.info("onCompleted");// 接收所有请求后,返回总数SimpleSummary summary = SimpleSummary.newBuilder().setFeatureCount(feature_count).build();responseObserver.onNext(summary);// 结束请求responseObserver.onCompleted();}};}
  • 双向流RPC(Bidirectional streaming RPCs)服务端实现:
        /*** 每接收一个请求,立即返回一个对象*/@Overridepublic StreamObserver<Simple> bindirectionalStreamRpc(StreamObserver<Simple> responseObserver) {return new StreamObserver<Simple>() {@Overridepublic void onNext(Simple value) {logger.info("bindirectionalStreamRpc receive {}", value);for (SimpleFeature feature : features) {Simple simpleLocation = feature.getLocation();if (value.getNum() == simpleLocation.getNum()) {// 接收请求后,马上推送记录Simple rtn = Simple.newBuilder().setName(feature.getName() + "rtn").setNum(feature.getLocation().getNum()).build();responseObserver.onNext(rtn);}}}@Overridepublic void onError(Throwable t) {logger.error("bindirectionalStreamRpc cancelled, e={}", t);}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}

启动服务端main方法:

    private static final Logger logger = LoggerFactory.getLogger(HelloStreamServer.class);private final int port;private final Server server;public HelloStreamServer(int port) throws IOException {this.port = port;this.server = ServerBuilder.forPort(port).addService(new HelloStreamService(HelloUtil.parseFeatures())).build();}// 启动服务public void start() throws IOException {server.start();logger.info("Server started, listening on " + port);Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {System.err.println("*** shutting down gRPC server since JVM is shutting down");HelloStreamServer.this.stop();System.err.println("*** server shut down");}});}// 启动服务public void stop() {if (server != null) {server.shutdown();}}/*** Await termination on the main thread since the grpc library uses daemon* threads.*/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}public static void main(String[] args) throws Exception {java.util.logging.Logger.getGlobal().setLevel(java.util.logging.Level.OFF);HelloStreamServer server = new HelloStreamServer(8980);server.start();server.blockUntilShutdown();}

2.2 客户端

客户端实现代码:创建客户端stub类
HelloStreamBlockingStub blockingStub:阻塞客户端,支持简单一元服务和流输出调用服务
HelloStreamStub asyncStub:异步客户端,支持所有类型调用

public class HelloStreamClient {private static final Logger logger = LoggerFactory.getLogger(HelloStreamClient.class);private final ManagedChannel channel;private final HelloStreamBlockingStub blockingStub;private final HelloStreamStub asyncStub;private Random random = new Random();public HelloStreamClient(String host, int port) {ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true);channel = channelBuilder.build();// 创建一个阻塞客户端,支持简单一元服务和流输出调用服务blockingStub = HelloStreamGrpc.newBlockingStub(channel);// 创建一个异步客户端,支持所有类型调用asyncStub = HelloStreamGrpc.newStub(channel);}public void shutdown() throws InterruptedException {channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);}......// 创建Simple对象private Simple newSimple(int num) {return Simple.newBuilder().setName("simple" + num).setNum(num).build();}}
  • 一元RPC(Unary RPCs )客户端实现:
    /*** 一元服务调用*/public void simpleRpc(int num) {logger.info("request simpleRpc: num={}", num);Simple simple = Simple.newBuilder().setName("simpleRpc").setNum(num).build();SimpleFeature feature;try {feature = blockingStub.simpleRpc(simple);} catch (StatusRuntimeException e) {logger.info("RPC failed: {0}", e.getStatus());return;}logger.info("simpleRpc end called {}", feature);}
  • 服务器流RPC(Server streaming RPCs)客户端实现:
    /*** 阻塞服务器流*/public void server2ClientRpc(int num1, int num2) {logger.info("request server2ClientRpc num1={}, num2={}", num1, num2);Simple simple = Simple.newBuilder().setName("simple" + num1).setNum(num1).build();Simple simple2 = Simple.newBuilder().setName("simple" + num2).setNum(num2).build();SimpleList simpleList = SimpleList.newBuilder().addSimpleList(simple).addSimpleList(simple2).build();Iterator<SimpleFeature> simpleFeatureIter = blockingStub.server2ClientRpc(simpleList);for (int i = 1; simpleFeatureIter.hasNext(); i++) {SimpleFeature feature = simpleFeatureIter.next();logger.info("Result {} : {}", i, feature);}}
  • 客户端流RPC(Client streaming RPCs )客户端实现:
    /*** 异步客户端流* */public void client2ServerRpc(int count) throws InterruptedException {logger.info("request client2ServerRpc {}", count);final CountDownLatch finishLatch = new CountDownLatch(1);StreamObserver<SimpleSummary> responseObserver = new StreamObserver<SimpleSummary>() {@Overridepublic void onNext(SimpleSummary value) {// 返回SimpleSummarylogger.info("client2ServerRpc onNext : {}", value);}@Overridepublic void onError(Throwable t) {logger.error("client2ServerRpc error : {}", t);finishLatch.countDown();}@Overridepublic void onCompleted() {logger.error("client2ServerRpc finish");finishLatch.countDown();}};StreamObserver<Simple> requestObserver = asyncStub.client2ServerRpc(responseObserver);try {for (int i = 0; i < count; i++) {logger.info("simple : {}", i);Simple simple = Simple.newBuilder().setName("client2ServerRpc" + i).setNum(i).build();requestObserver.onNext(simple);Thread.sleep(random.nextInt(200) + 50);}} catch (RuntimeException e) {// Cancel RPCrequestObserver.onError(e);throw e;}// 结束请求requestObserver.onCompleted();// Receiving happens asynchronouslyif (!finishLatch.await(1, TimeUnit.MINUTES)) {logger.error("client2ServerRpc can not finish within 1 minutes");}}
  • 双向流RPC(Bidirectional streaming RPCs)客户端实现:
    /*** 双向流* * @throws InterruptedException*/public void bindirectionalStreamRpc() throws InterruptedException {logger.info("request bindirectionalStreamRpc");final CountDownLatch finishLatch = new CountDownLatch(1);StreamObserver<Simple> requestObserver = asyncStub.bindirectionalStreamRpc(new StreamObserver<Simple>() {@Overridepublic void onNext(Simple value) {logger.info("bindirectionalStreamRpc receive message : {}", value);}@Overridepublic void onError(Throwable t) {logger.error("bindirectionalStreamRpc Failed: {0}", Status.fromThrowable(t));finishLatch.countDown();}@Overridepublic void onCompleted() {logger.info("Finished bindirectionalStreamRpc");finishLatch.countDown();}});try {Simple[] requests = { newSimple(1), newSimple(2), newSimple(3), newSimple(4) };for (Simple request : requests) {logger.info("Sending message {}", request);requestObserver.onNext(request);}} catch (RuntimeException e) {// Cancel RPCrequestObserver.onError(e);throw e;}requestObserver.onCompleted();if (!finishLatch.await(1, TimeUnit.MINUTES)) {logger.error("routeChat can not finish within 1 minutes");}}

启动客户端

    public static void main(String[] args) throws InterruptedException {HelloStreamClient client = new HelloStreamClient("localhost", 8980);try {// simple rpcclient.simpleRpc(1);// server2ClientRpcclient.server2ClientRpc(1, 2);// client2ServerRpcclient.client2ServerRpc(2);// bindirectionalStreamRpcclient.bindirectionalStreamRpc();} finally {client.shutdown();}}

3. 代码

详细代码见这里Github

Grpc系列二 Grpc4种服务方法的定义和实现相关推荐

  1. android 原生开发 3d地图 下载_arcgis api 3.x for js 入门开发系列二不同地图服务展示(附源码下载)...

    前言 关于本篇功能实现用到的 api 涉及类看不懂的,请参照 esri 官网的 arcgis api 3.x for js:esri 官网 api,里面详细的介绍 arcgis api 3.x 各个类 ...

  2. 【指数编制系列二】数据标准化方法

      在系统学习指数编制方法之前,先介绍一下几个指数编制过程中会经常使用的数据处理方法,如:数据标准化方法.权重设置方法.异常值处理方法.因为在后面指数编制过程中会经常用到这些方法.接下来我还是按照分类 ...

  3. Kafka系列二——消息发送send方法源码分析

    文章目录 一.send使用说明 1.1 客户端代码 1.2 ProducerRecord 二.发送过程 2.1 send 2.2 doSend关键代码 2.2.1 RecordAccumulator原 ...

  4. arcgis api 3.x for js 入门开发系列二不同地图服务展示(附源码下载)

    前言 关于本篇功能实现用到的 api 涉及类看不懂的,请参照 esri 官网的 arcgis api 3.x for js:esri 官网 api,里面详细的介绍 arcgis api 3.x 各个类 ...

  5. gRPC系列(三) 如何借助HTTP2实现传输

    本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三)如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...

  6. gRPC 基础(二)-- Go 语言版 gRPC-Go

    gRPC-Go Github gRPC的Go实现:一个高性能.开源.通用的RPC框架,将移动和HTTP/2放在首位.有关更多信息,请参阅Go gRPC文档,或直接进入快速入门. 一.快速入门 本指南通 ...

  7. grpc原理及四种实现方式

    文章目录 1. rpc概述 1.1 rpc和http区别 2. grpc介绍 调用过程 2.1. 使用原理 2.2 服务定义 2.3. 同步与异步 3. rpc的种类 3.1 一元 RPC 3.2 服 ...

  8. css垂直居中最常用的八种布局方法

    css垂直居中最常用的八种布局方法 首先定义两个盒子,然后进行下布局样式操作! 利用CSS进行元素的水平居中,比较简单,行级元素设置其父元素的text-align center 块级元素设置其本身的l ...

  9. 微服务架构系列二:密码强度评测的实现与实验

    本文是继<微服务架构系列一:关键技术与原理研究>的后续,系列一中论述了微服务研究的背景和意义,主要调研了传统架构的发展以及存在的问题和微服务架构的由来,然后针对微服务架构的设计原则.容器技 ...

最新文章

  1. 朴素高精度乘法的常数优化
  2. More Effective C++:理解new和delete
  3. python集合中的元素不允许重复对吗_python字典中的值为什么不允许重复
  4. mysql报错注入_关于Mysql注入过程中的三种报错方式
  5. linux ssh命令 带密码,[命令] Linux 命令 sshpass(密码非交互式 ssh)(转载)
  6. itext 添加空格_借助 iText 用代码在 PDF 中创建空白签名域
  7. html在线压缩tar.gz源码,c50_melp.tar.gz美国2400语音压缩编码算法,文件…
  8. C运行库和C语言函数库/Microsoft C运行库
  9. 什么时候加上android.intent.category.DEFAULT和LAUNCHER
  10. Redis和Memcached:数据类型 过期策略 持久策略 虚拟内存 Value大小
  11. Maven解决jar包版本冲突
  12. Rider 2021.3 Beta 现已推出
  13. php v-for=,Vue中v-for循环节点的实现代码
  14. 8. String to Integer[M]字符串转整数
  15. errgroup 分析
  16. JavaSE基础——网络编程
  17. 什么是Pagerank?Pagerank算法介绍与计算公式
  18. 2022爱分析· 工业互联网厂商全景报告
  19. 主数据同步与分发实现
  20. 基于Python的宋词生成器

热门文章

  1. python numpy中fromfile函数的使用
  2. alipay 网站支付
  3. 基于JavaEE的IT威客网站管理系统_JSP网站设计_SqlServer数据库设计
  4. eclipse自动关闭解决方案
  5. SpringBoot2+MybatisPlus3入门手册v2修订版
  6. 全球最年长程序员,84岁硬核老奶奶的励志编程路
  7. 模拟如何渲染100000条数据
  8. 在同一Android应用程序内,信息安全技术题库:Android中同一个应用程序的所有进程可以属于不同用户。()...
  9. 最新《IT营Angular5 Angular4.X入门实战》
  10. 补充:爬虫技术成就了这些商业公司的