gRPC Streaming的操作对象由服务端和客户端组成。在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务。这时调用服务端又成为了提供服务端的客户端了(服务消费端)。那么如果我们用streaming形式来提交服务需求及获取计算结果就是以一个服务端为Source另一个服务端为通过式passthrough Flow的stream运算了。讲详细点就是请求方用需求构建Source,以连接Flow的方式把需求传递给服务提供方。服务提供方在Flow内部对需求进行处理后再把结果返回来,请求方run这个连接的stream应该就可以得到需要的结果了。下面我们就针对以上场景在一个由JDBC,Cassandra,MongoDB几种gRPC服务组成的集群环境里示范在这几个服务之间的stream连接和运算。

首先,我们设计一个简单但比较有代表性的例子:从JDBC的客户端传一个字符型消息hello给JDBC服务端、JDBC服务端在hello后面添加“,from jdbc to cassandra”然后通过Cassandra客户端把消息当作请求传给Cassandra服务端、Cassandra服务端在消息后面再加上“,from cassandra to mongo”并通过MongoDB客户端把消息传给MongoDB服务端、最后MongoDB服务端在消息后面添加“,mongo says hi”。整个stream的形状是 jdbc-client->jdbc-service->cassandra-service-mongodb-service。如果run这个stream得到的结果应该是一个描述完整移动路径的消息。从请求-服务角度来描述:我们可以把每个节点消息更新处理当作某种完整的数据处理过程。

以下分别是JDBC,Cassandra,MongoDB gRPC IDL定义:

service JDBCServices {rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}service CQLServices {rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}service MGOServices {rpc greeting(stream HelloMsg) returns (stream HelloMsg) {}
}

三个服务共用了protobuf消息类型HelloMsg。我们把共用的消息统一放到一个common.proto文件里:

syntax = "proto3";package sdp.grpc.services;message HelloMsg {string hello = 1;
}message DataRow {string countyname = 1;string statename = 2;int32 reportyear = 3;int32 value = 4;
}

然后在示范应用的.proto文件中用import 把所有protobuf,gRPC服务定义都集中起来:

syntax = "proto3";import "google/protobuf/wrappers.proto";
import "google/protobuf/any.proto";
import "scalapb/scalapb.proto";option (scalapb.options) = {// use a custom Scala package name// package_name: "io.ontherocks.introgrpc.demo"// don't append file name to packageflat_package: true// generate one Scala file for all messages (services still get their own file)single_file: true// add imports to generated file// useful when extending traits or using custom types// import: "io.ontherocks.hellogrpc.RockingMessage"// code to put at the top of generated file// works only with `single_file: true`//preamble: "sealed trait SomeSealedTrait"
};/** Demoes various customization options provided by ScalaPBs.*/package sdp.grpc.services;import "misc/sdp.proto";
import "common.proto";
import "cql/cql.proto";
import "jdbc/jdbc.proto";
import "mgo/mgo.proto";

下面我们把最核心的服务实现挑出来讲解一下,先看看Cassandra服务的实现:

import sdp.grpc.mongo.client.MGOClientclass CQLStreamingServices(implicit ec: ExecutionContextExecutor,mat: ActorMaterializer,  session: Session)extends CqlGrpcAkkaStream.CQLServices with LogSupport{val mongoClient = new MGOClientval stub = mongoClient.stubdef sayHelloTo(msg: String): Flow[HelloMsg, HelloMsg, NotUsed] =Flow[HelloMsg].map { r => HelloMsg(r.hello + msg)}.via(stub.greeting)override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =Flow[HelloMsg].via(sayHelloTo(",from cassandra to mongo"))}

streaming方式的gRPC服务其实就是一个akka-stream的Flow[R1,R2,M],它把收到的数据R1处理后转换成R2输出。在处理R1的环节里可能会需要其它服务的运算结果。在以上例子里CQLService把收到的消息加工转换后传给MGOService并等待MGOService再深度加工返还的结果,所以sayHelloTo还是一个有两个节点的Flow:在第一个节点中对收到的消息进行加工,第二个节点把加工的消息传给另一个服务并连接它的运算结果作为本身最终的输出。调用其它跨集群节点的服务必须经该服务的gRPC客户端进行,这里调用的MGOClient:

package sdp.grpc.mongo.clientimport sdp.grpc.services._
import sdp.logging.LogSupport
import io.grpc._
import common._
import sdp.grpc.services._
import akka.stream.scaladsl._
import akka.NotUsedclass MGOClient extends LogSupport {val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build()val stub = MgoGrpcAkkaStream.stub(channel)}

JDBCService连接CQLService, CQLService连接MGOService:

import sdp.grpc.cassandra.client.CQLClientclass JDBCStreamingServices(implicit ec: ExecutionContextExecutor)extends JdbcGrpcAkkaStream.JDBCServices with LogSupport {val cassandraClient = new CQLClientval stub = cassandraClient.stubdef sayHelloTo(msg: String): Flow[HelloMsg,HelloMsg,NotUsed] =Flow[HelloMsg].map {r => HelloMsg(r.hello + msg)}.via(stub.greeting)override def greeting: Flow[HelloMsg, HelloMsg, NotUsed] =Flow[HelloMsg].via(sayHelloTo(",from jdbc to cassandra"))}

最后我们用DemoApp来示范整个过程:

package demo.sdp.grpcimport akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ThrottleMode}import sdp.grpc.jdbc.client.JDBCClientobject DemoApp extends App {implicit val system = ActorSystem("jdbcClient")implicit val mat = ActorMaterializer.create(system)implicit val ec = system.dispatcherval jdbcClient = new JDBCClientjdbcClient.sayHello.runForeach(r => println(r.hello))scala.io.StdIn.readLine()mat.shutdown()system.terminate()}

DemoApp调用了JDBCClient:

package sdp.grpc.jdbc.clientimport sdp.grpc.services._
import sdp.logging.LogSupport
import io.grpc._
import common._
import sdp.grpc.services._
import akka.stream.scaladsl._
import akka.NotUsedclass JDBCClient extends LogSupport {val channel = ManagedChannelBuilder.forAddress("localhost", 50053).usePlaintext().build()val stub = JdbcGrpcAkkaStream.stub(channel)def sayHello: Source[HelloMsg, NotUsed] = {val row = HelloMsg("hello ")val rows = List.fill[HelloMsg](100)(row)Source.fromIterator(() => rows.iterator).via(stub.greeting)}
}

运行DemoApp显示的结果:

hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
hello ,from jdbc to cassandra,from cassandra to mongo, mongo says hi
...

转载于:https://www.cnblogs.com/tiger-xc/p/9660828.html

PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming...相关推荐

  1. PICE(6):集群环境里多异类端点gRPC Streaming - Heterogeneous multi-endpoints gRPC streaming

    gRPC Streaming的操作对象由服务端和客户端组成.在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务.这时调用服务端又成为了提供服务端的客户端了(服务消费端) ...

  2. PICE(1):Programming In Clustered Environment - 集群环境内编程模式

    首先声明:标题上的所谓编程模式是我个人考虑在集群环境下跨节点(jvm)的流程控制编程模式,纯粹按实际需要构想,没什么理论支持.在5月份的深圳scala meetup上我分享了有关集群环境下的编程模式思 ...

  3. quartz在集群环境下的最终解决方案

    在集群环境下,大家会碰到一直困扰的问题,即多个 APP 下如何用 quartz 协调处理自动化 JOB . 大家想象一下,现在有 A , B , C3 台机器同时作为集群服务器对外统一提供 SERVI ...

  4. kafka 基础知识梳理及集群环境部署记录

    一.kafka基础介绍 Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特 ...

  5. 使用Docker搭建Elasticsearch集群环境

    本篇文章首发于头条号单机如何搭建Elasticsearch集群?使用容器技术快速构建集群环境,欢迎关注头条号和微信公众号"大数据技术和人工智能"(微信搜索bigdata_ai_te ...

  6. 『高级篇』docker之DockerSwarm的集群环境搭建(28)

    原创文章,欢迎转载.转载请注明:转载自IT人故事会,谢谢! 原文链接地址:『高级篇』docker之DockerSwarm的集群环境搭建(28) 上次了解了docker Swarm,这次一起动手操作,搭 ...

  7. APACHE-TOMCAT集群环境部署

    APACHE-TOMCAT集群环境部署 -----------------–Shanks ---------------------------------------- 本集群不涉及session同 ...

  8. Java技术分享:集群环境下的定时任务

    定时任务的实现方式有多种,例如JDK自带的Timer+TimerTask方式,Spring 3.0以后的调度任务(Scheduled Task),Quartz框架等. Timer+TimerTask是 ...

  9. 学习笔记之-Kubernetes(K8S)介绍,集群环境搭建,Pod详解,Pod控制器详解,Service详解,数据存储,安全认证,DashBoard

    笔记来源于观看黑马程序员Kubernetes(K8S)教程 第一章 kubernetes介绍 应用部署方式演变 在部署应用程序的方式上,主要经历了三个时代: 传统部署:互联网早期,会直接将应用程序部署 ...

最新文章

  1. 软件测试风险评估分析
  2. 20155313 2016-2017-2 《Java程序设计》第三周学习总结
  3. C语言阿斯码,木叶四位上忍设定各不相同,网红负责秀操作,她只需要美就够了...
  4. 青岛旅游学校计算机证书,【我和我的旅校】青岛旅游学校优秀毕业生郭千瑜
  5. 带有示例的Python列表reverse()方法
  6. 支持markdown的服务器,基于tornado实现的一个markdown解析服务器
  7. 好书一本:《设计心理学》
  8. Android最佳安全应用程序已出炉,Google Play Protect曝大冷门
  9. 应对互联网变局,这 8 件事必须要做。
  10. PC建立WIFI热点
  11. svn 分支上新增文件合并发生冲突_windows 下svn 创建分支 合并分支 冲突
  12. 下载SE78里面的图片
  13. office 中墨迹书写工具_word2016 墨迹书写 word2016墨迹书写工具
  14. android编程播放音乐,Android编程实现播放音频的方法示例
  15. 创建第一个SpringBoot项目
  16. 百度地图开发Sug检索Demo
  17. 牛顿迭代法求解根的问题
  18. 爬取qq好友说说并对数据简单分析
  19. 摩拜共享单车技术含量
  20. Latex输入罗马数字的最简便方法

热门文章

  1. 合泰HT66F2390单片机IIC使用例程(红外测温MLX90614模块的使用)
  2. web系统私有化部署方案
  3. node-sass安装失败、报错、解决办法总结
  4. java拼团小程序源码(毕设)
  5. 小型项目SSM+Maven实战讲解:APP信息管理平台-developer版
  6. window下如何设置屏幕扩展
  7. 1.8-19:肿瘤检测
  8. 用c#在excel中插入图片和设置表格宽度
  9. sony 摄像机使用
  10. OpenStreetMap 应用